From c21465179b208fa9e13489a2d23f96ea455089bf Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 28 Sep 2020 20:41:30 +0200 Subject: [PATCH] [FLINK-19317] Remove unnecessary calls to setStreamTimeCharacteristic (scala) I'm just removing calls the set EventTime because that's the new default now. I'm also removing most calls to set ProcessingTime because it's not needed for making processing-time timers/windows work. I only left it for some tests that check specific failure behavior. I removed calls to set IngestionTime and replaced them by an explicit IngestionTimeWatermarkStrategy. I duplicated the same IngestionTimeWatermarkStrategy in all the examples/tests because I explicitly didn't want to add an IngestionTimeWatermarkStrategy in one of the core packages so that it is not discoverable because I think we shouldn't encourage users to use ingestion time. --- .../api/scala/AllWindowTranslationTest.scala | 33 ----------------- .../api/scala/BroadcastStateITCase.scala | 2 - .../api/scala/CoGroupJoinITCase.scala | 4 -- .../api/scala/IntervalJoinITCase.scala | 3 -- .../api/scala/SideOutputITCase.scala | 5 --- .../api/scala/WindowFunctionITCase.scala | 5 --- .../api/scala/WindowReduceITCase.scala | 7 ---- .../api/scala/WindowTranslationTest.scala | 37 ------------------- .../MatchRecognizeValidationTest.scala | 1 - .../LegacyTableSinkValidationTest.scala | 1 - .../harness/OverWindowHarnessTest.scala | 4 -- .../stream/sql/IntervalJoinITCase.scala | 2 - .../stream/sql/MatchRecognizeITCase.scala | 5 --- .../stream/sql/TemporalJoinITCase.scala | 2 - .../stream/table/LegacyTableSinkITCase.scala | 14 ------- .../runtime/utils/StreamingTestBase.scala | 2 - .../table/planner/utils/TableTestBase.scala | 3 -- .../validation/TableSinkValidationTest.scala | 1 - .../TableSourceValidationTest.scala | 1 - .../runtime/stream/TimeAttributesITCase.scala | 2 - .../runtime/stream/sql/InsertIntoITCase.scala | 1 - .../table/runtime/stream/sql/JoinITCase.scala | 13 ------- .../runtime/stream/sql/OverWindowITCase.scala | 1 - .../table/runtime/stream/sql/SortITCase.scala | 2 - .../table/runtime/stream/sql/SqlITCase.scala | 5 --- .../stream/sql/TemporalJoinITCase.scala | 2 - .../stream/table/GroupWindowITCase.scala | 9 ----- .../GroupWindowTableAggregateITCase.scala | 8 ---- .../stream/table/OverWindowITCase.scala | 3 -- .../stream/table/TableSinkITCase.scala | 2 - .../stream/table/TableSourceITCase.scala | 17 --------- 31 files changed, 197 deletions(-) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index bd94d3238a104..707df60e7f6a2 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.state.{AggregatingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction} import org.apache.flink.streaming.api.transformations.OneInputTransformation @@ -69,8 +68,6 @@ class AllWindowTranslationTest { val env = StreamExecutionEnvironment.getExecutionEnvironment val source = env.fromElements(("hello", 1), ("hello", 2)) - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) - source .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) .reduce(new RichReduceFunction[(String, Int)] { @@ -89,8 +86,6 @@ class AllWindowTranslationTest { val env = StreamExecutionEnvironment.getExecutionEnvironment val source = env.fromElements(("hello", 1), ("hello", 2)) - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) - source .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) .aggregate(new DummyRichAggregator()) @@ -133,7 +128,6 @@ class AllWindowTranslationTest { @Test def testMergingWindowsWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -172,7 +166,6 @@ class AllWindowTranslationTest { @Test def testReduceEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -205,7 +198,6 @@ class AllWindowTranslationTest { @Test def testReduceProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -238,7 +230,6 @@ class AllWindowTranslationTest { @Test def testReduceEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -271,7 +262,6 @@ class AllWindowTranslationTest { @Test def testReduceWithWindowFunctionEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -311,7 +301,6 @@ class AllWindowTranslationTest { @Test def testReduceWithWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -350,7 +339,6 @@ class AllWindowTranslationTest { @Test def testReduceWithProcessWindowFunctionEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -390,7 +378,6 @@ class AllWindowTranslationTest { @Test def testReduceWithProcessWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -429,7 +416,6 @@ class AllWindowTranslationTest { @Test def testApplyWithPreReducerEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -468,7 +454,6 @@ class AllWindowTranslationTest { @Test def testReduceWithWindowFunctionEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -510,7 +495,6 @@ class AllWindowTranslationTest { @Test def testAggregateEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -543,7 +527,6 @@ class AllWindowTranslationTest { @Test def testAggregateProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -576,7 +559,6 @@ class AllWindowTranslationTest { @Test def testAggregateWithWindowFunctionEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -609,7 +591,6 @@ class AllWindowTranslationTest { @Test def testAggregateWithWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -642,7 +623,6 @@ class AllWindowTranslationTest { @Test def testAggregateWithProcessWindowFunctionEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -675,7 +655,6 @@ class AllWindowTranslationTest { @Test def testAggregateWithProcessWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -708,7 +687,6 @@ class AllWindowTranslationTest { @Test def testAggregateWithWindowFunctionEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -749,7 +727,6 @@ class AllWindowTranslationTest { @Test def testApplyEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -788,7 +765,6 @@ class AllWindowTranslationTest { @Test def testApplyProcessingTimeTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -827,7 +803,6 @@ class AllWindowTranslationTest { @Test def testProcessEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -866,7 +841,6 @@ class AllWindowTranslationTest { @Test def testProcessProcessingTimeTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -905,7 +879,6 @@ class AllWindowTranslationTest { @Test def testApplyEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -941,7 +914,6 @@ class AllWindowTranslationTest { @Test def testReduceWithCustomTrigger() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -975,7 +947,6 @@ class AllWindowTranslationTest { @Test def testApplyWithCustomTrigger() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1015,7 +986,6 @@ class AllWindowTranslationTest { @Test def testProcessWithCustomTrigger() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1055,7 +1025,6 @@ class AllWindowTranslationTest { @Test def testReduceWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1091,7 +1060,6 @@ class AllWindowTranslationTest { @Test def testApplyWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1132,7 +1100,6 @@ class AllWindowTranslationTest { @Test def testProcessWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala index 142133cc7ad18..41e07e51963ab 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.api.common.state.MapStateDescriptor import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction import org.apache.flink.streaming.api.functions.sink.RichSinkFunction @@ -55,7 +54,6 @@ class BroadcastStateITCase extends AbstractTestBase { 5L -> "test:5") val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val srcOne = env .generateSequence(0L, 5L) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala index f6aede4184c43..958826246e27d 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction @@ -40,7 +39,6 @@ class CoGroupJoinITCase extends AbstractTestBase { CoGroupJoinITCase.testResults = mutable.MutableList() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { @@ -107,7 +105,6 @@ class CoGroupJoinITCase extends AbstractTestBase { CoGroupJoinITCase.testResults = mutable.MutableList() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, String, Int)]() { @@ -187,7 +184,6 @@ class CoGroupJoinITCase extends AbstractTestBase { CoGroupJoinITCase.testResults = mutable.MutableList() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, String, Int)]() { diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala index 7815106068180..19ebf3b803c00 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor @@ -36,7 +35,6 @@ class IntervalJoinITCase extends AbstractTestBase { @Test def testInclusiveBounds(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dataStream1 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L)) @@ -74,7 +72,6 @@ class IntervalJoinITCase extends AbstractTestBase { @Test def testExclusiveBounds(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dataStream1 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L)) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala index ef77937977cca..4fb03011f92ba 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.scala import java.util -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, ProcessFunction} import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction, ProcessWindowFunction} import org.apache.flink.streaming.api.watermark.Watermark @@ -154,7 +153,6 @@ class SideOutputITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4)) @@ -198,7 +196,6 @@ class SideOutputITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4)) @@ -244,7 +241,6 @@ class SideOutputITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4)) @@ -291,7 +287,6 @@ class SideOutputITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4)) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala index 5914d0b0b8393..9c5e7ed29592b 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala @@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction @@ -44,7 +43,6 @@ class WindowFunctionITCase extends TestLogger { CheckingIdentityRichWindowFunction.reset() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { @@ -93,7 +91,6 @@ class WindowFunctionITCase extends TestLogger { CheckingIdentityRichProcessWindowFunction.reset() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { @@ -142,7 +139,6 @@ class WindowFunctionITCase extends TestLogger { CheckingIdentityRichAllWindowFunction.reset() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { @@ -190,7 +186,6 @@ class WindowFunctionITCase extends TestLogger { CheckingIdentityRichProcessAllWindowFunction.reset() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala index 748d5a17f6231..69d612f6106df 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction @@ -48,7 +47,6 @@ class WindowReduceITCase extends AbstractTestBase { WindowReduceITCase.testResults = mutable.MutableList() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { @@ -102,7 +100,6 @@ class WindowReduceITCase extends AbstractTestBase { } val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { @@ -160,7 +157,6 @@ class WindowReduceITCase extends AbstractTestBase { } val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { @@ -211,7 +207,6 @@ class WindowReduceITCase extends AbstractTestBase { WindowReduceITCase.testResults = mutable.MutableList() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { @@ -263,7 +258,6 @@ class WindowReduceITCase extends AbstractTestBase { } val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { @@ -319,7 +313,6 @@ class WindowReduceITCase extends AbstractTestBase { } val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val source1 = env.addSource(new SourceFunction[(String, Int)]() { diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 6d1b33824a75b..58be63ebd2020 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.state.{AggregatingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.operators.{OneInputStreamOperator, OutputTypeConfigurable} import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction} import org.apache.flink.streaming.api.transformations.OneInputTransformation @@ -62,8 +61,6 @@ class WindowTranslationTest { val env = StreamExecutionEnvironment.getExecutionEnvironment val source = env.fromElements(("hello", 1), ("hello", 2)) - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) - source .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) @@ -83,8 +80,6 @@ class WindowTranslationTest { val env = StreamExecutionEnvironment.getExecutionEnvironment val source = env.fromElements(("hello", 1), ("hello", 2)) - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) - source .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) @@ -136,7 +131,6 @@ class WindowTranslationTest { @Test def testMergingWindowsWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -175,7 +169,6 @@ class WindowTranslationTest { @Test def testReduceEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -209,7 +202,6 @@ class WindowTranslationTest { @Test def testReduceProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -243,7 +235,6 @@ class WindowTranslationTest { @Test def testReduceEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -277,7 +268,6 @@ class WindowTranslationTest { @Test def testReduceWithWindowFunctionEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -318,7 +308,6 @@ class WindowTranslationTest { @Test def testReduceWithWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -359,7 +348,6 @@ class WindowTranslationTest { @Test def testReduceWithProcessWindowFunctionEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -401,7 +389,6 @@ class WindowTranslationTest { @Test def testReduceWithProcessWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -443,7 +430,6 @@ class WindowTranslationTest { @Test def testApplyWithPreReducerEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -484,7 +470,6 @@ class WindowTranslationTest { @Test def testApplyWithPreReducerAndEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -528,7 +513,6 @@ class WindowTranslationTest { @Test def testReduceWithWindowFunctionEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -571,7 +555,6 @@ class WindowTranslationTest { @Test def testAggregateEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -605,7 +588,6 @@ class WindowTranslationTest { @Test def testAggregateProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -639,7 +621,6 @@ class WindowTranslationTest { @Test def testAggregateWithWindowFunctionEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -673,7 +654,6 @@ class WindowTranslationTest { @Test def testAggregateWithWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -707,7 +687,6 @@ class WindowTranslationTest { @Test def testAggregateWithProcessWindowFunctionEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -741,7 +720,6 @@ class WindowTranslationTest { @Test def testAggregateWithProcessWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -775,7 +753,6 @@ class WindowTranslationTest { @Test def testAggregateWithWindowFunctionEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -817,7 +794,6 @@ class WindowTranslationTest { @Test def testApplyEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -858,7 +834,6 @@ class WindowTranslationTest { @Test def testApplyProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -899,7 +874,6 @@ class WindowTranslationTest { @Test def testProcessEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -940,7 +914,6 @@ class WindowTranslationTest { @Test def testProcessProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -981,7 +954,6 @@ class WindowTranslationTest { @Test def testApplyEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1018,7 +990,6 @@ class WindowTranslationTest { @Test def testReduceWithCustomTrigger() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1053,7 +1024,6 @@ class WindowTranslationTest { @Test def testApplyWithCustomTrigger() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1095,7 +1065,6 @@ class WindowTranslationTest { @Test def testProcessWithCustomTrigger() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1138,7 +1107,6 @@ class WindowTranslationTest { @Test def testReduceWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1175,7 +1143,6 @@ class WindowTranslationTest { @Test def testReduceWithEvictorAndProcessFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1212,7 +1179,6 @@ class WindowTranslationTest { @Test def testAggregateWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1247,7 +1213,6 @@ class WindowTranslationTest { @Test def testAggregateWithEvictorAndProcessFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1282,7 +1247,6 @@ class WindowTranslationTest { @Test def testApplyWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) @@ -1325,7 +1289,6 @@ class WindowTranslationTest { @Test def testProcessWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala index 3fcab9f36ffd5..1a0bc25f65ef1 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala @@ -35,7 +35,6 @@ import java.sql.Timestamp class MatchRecognizeValidationTest extends TableTestBase { private val streamUtil = scalaStreamTestUtil() - streamUtil.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamUtil.addDataStream[(Int, String, Timestamp)]( "MyTable", 'a, 'b, 'rowtime.rowtime, 'proctime.proctime) streamUtil.addDataStream[(String, Long, Int, Int)]( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala index 3f2061e761cdd..2d3fb5b1cc484 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala @@ -50,7 +50,6 @@ class LegacyTableSinkValidationTest extends TableTestBase { @Test(expected = classOf[TableException]) def testUpsertSinkOnUpdatingTableWithoutFullKey(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(TestData.tupleData3) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverWindowHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverWindowHarnessTest.scala index 8ce2138881251..3703c00b8159e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverWindowHarnessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverWindowHarnessTest.scala @@ -391,7 +391,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode def testRowTimeBoundedRangeOver(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val t = env.fromCollection(data).toTable(tEnv, 'rowtime.rowtime, 'b, 'c) tEnv.registerTable("T", t) @@ -511,7 +510,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode def testRowTimeBoundedRowsOver(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val t = env.fromCollection(data).toTable(tEnv, 'rowtime.rowtime, 'b, 'c) tEnv.registerTable("T", t) @@ -664,7 +662,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode def testRowTimeUnboundedRangeOver(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val t = env.fromCollection(data).toTable(tEnv, 'rowtime.rowtime, 'b, 'c) tEnv.registerTable("T", t) @@ -807,7 +804,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode def testRowTimeUnboundedRowsOver(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val t = env.fromCollection(data).toTable(tEnv, 'rowtime.rowtime, 'b, 'c) tEnv.registerTable("T", t) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala index 8e9e52c59e829..249c63d7e01f9 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api._ @@ -231,7 +230,6 @@ class IntervalJoinITCase(mode: StateBackendMode) extends StreamingWithStateTestB /** test row time inner join with equi-times **/ @Test def testRowTimeInnerJoinWithEquiTimeAttrs(): Unit = { - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala index d2e2e008a5f82..63874b32e0a36 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala @@ -21,7 +21,6 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ @@ -197,7 +196,6 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testEventsAreProperlyOrdered(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = Seq( @@ -255,7 +253,6 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testMatchRecognizeAppliedToWindowedGrouping(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(String, Long, Int, Int)] @@ -316,7 +313,6 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testWindowedGroupingAppliedToMatchRecognize(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(String, Long, Int, Int)] @@ -420,7 +416,6 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testPartitionByWithParallelSource(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(String, Long, Int, Int)] diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala index 3d3d909f15f25..c78447ed63651 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala @@ -102,7 +102,6 @@ class TemporalJoinITCase(state: StateBackendMode) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -165,7 +164,6 @@ class TemporalJoinITCase(state: StateBackendMode) def testNestedTemporalJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/LegacyTableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/LegacyTableSinkITCase.scala index 47f2e7f0ee5d7..34d7523962419 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/LegacyTableSinkITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/LegacyTableSinkITCase.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.planner.runtime.stream.table import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ @@ -52,7 +51,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.setParallelism(4) @@ -89,7 +87,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testAppendSinkOnAppendTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) @@ -151,7 +148,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testAppendSinkOnAppendTableForInnerJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val ds1 = env.fromCollection(smallTupleData3).toTable(tEnv, 'a, 'b, 'c) @@ -177,7 +173,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testRetractSinkOnUpdatingTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) @@ -213,7 +208,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testRetractSinkOnAppendTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) @@ -252,7 +246,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) @@ -291,7 +284,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) @@ -334,7 +326,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) @@ -379,7 +370,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) @@ -422,7 +412,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) @@ -465,7 +454,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testUpsertSinkWithFilter(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.setParallelism(4) @@ -503,7 +491,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testToAppendStreamMultiRowtime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) @@ -522,7 +509,6 @@ class LegacyTableSinkITCase extends AbstractTestBase { def testToRetractStreamMultiRowtime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala index cf289972905a5..9f106c4f25634 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.planner.runtime.utils -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.{EnvironmentSettings, ImplicitExpressionConversions} @@ -48,7 +47,6 @@ class StreamingTestBase extends AbstractTestBase { def before(): Unit = { this.env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(4) - this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) if (enableObjectReuse) { this.env.getConfig.enableObjectReuse() } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index f70689560dfd9..0618dde0c5c72 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -504,7 +504,6 @@ abstract class TableTestUtil( GlobalDataExchangeMode.ALL_EDGES_PIPELINED.toString) private val env: StreamExecutionEnvironment = getPlanner.getExecEnv - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) override def getTableEnv: TableEnvironment = tableEnv @@ -657,7 +656,6 @@ abstract class ScalaTableTestUtil( extends TableTestUtilBase(test, isStreamingMode) { // scala env val env = new ScalaStreamExecEnv(new LocalStreamEnvironment()) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // scala tableEnv val tableEnv: ScalaStreamTableEnv = ScalaStreamTableEnv.create(env, setting) @@ -691,7 +689,6 @@ abstract class JavaTableTestUtil( extends TableTestUtilBase(test, isStreamingMode) { // java env val env = new LocalStreamEnvironment() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // java tableEnv // use impl class instead of interface class to avoid // "Static methods in interface require -target:jvm-1.8" diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala index eb2a2deb22abb..6d8b24794d696 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala @@ -51,7 +51,6 @@ class TableSinkValidationTest extends TableTestBase { @Test(expected = classOf[TableException]) def testUpsertSinkOnUpdatingTableWithoutFullKey(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env) val t = StreamTestData.get3TupleDataStream(env) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala index 3c5a718708495..c5c350692a7ea 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala @@ -39,7 +39,6 @@ import java.util.Collections class TableSourceValidationTest extends TableTestBase{ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index c94ec42c8db30..909e8429f0434 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -62,7 +62,6 @@ class TimeAttributesITCase extends AbstractTestBase { (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world")) val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv: StreamTableEnvironment = StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().useOldPlanner().build()) @@ -598,7 +597,6 @@ class TimeAttributesITCase extends AbstractTestBase { def testMaterializedRowtimeFilter(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().useOldPlanner().build()) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala index 35dcf6ac70040..bf5b299d93b3a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala @@ -38,7 +38,6 @@ class InsertIntoITCase extends StreamingWithStateTestBase { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala index 83ffbffd5c0a1..3fa793b481755 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.runtime.stream.sql import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark @@ -142,7 +141,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -201,7 +199,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -248,7 +245,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -311,7 +307,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -366,7 +361,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -421,7 +415,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -515,7 +508,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -580,7 +572,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -669,7 +660,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -730,7 +720,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -819,7 +808,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -883,7 +871,6 @@ class JoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala index 1b56d1a4c1f6c..08d0620e900cd 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala @@ -58,7 +58,6 @@ class OverWindowITCase extends StreamingWithStateTestBase { def setup(): Unit = { StreamITCase.clear env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) env.setParallelism(1) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala index 4c1d77286d467..b1c15bdf95473 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala @@ -79,7 +79,6 @@ class SortITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) StreamITCase.clear @@ -114,7 +113,6 @@ class SortITCase extends StreamingWithStateTestBase { @Test def testInsertIntoMemoryTableOrderBy(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) MemoryTableSourceSinkUtil.clear() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 8e9f100578d3e..9e292df7667f0 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -74,7 +74,6 @@ class SqlITCase extends StreamingWithStateTestBase { (16L, 16, "Hello")) // (3, Hello) - window (not merged) val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val stream = env @@ -114,7 +113,6 @@ class SqlITCase extends StreamingWithStateTestBase { def testDistinctAggOnRowTimeTumbleWindow(): Unit = { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, settings) env.setParallelism(1) @@ -147,7 +145,6 @@ class SqlITCase extends StreamingWithStateTestBase { def testRowTimeTumbleWindow(): Unit = { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, settings) env.setParallelism(1) @@ -702,7 +699,6 @@ class SqlITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env, settings) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) env.setParallelism(1) @@ -752,7 +748,6 @@ class SqlITCase extends StreamingWithStateTestBase { def testWriteReadTableSourceSink(): Unit = { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = StreamTableEnvironment.create(env, settings) MemoryTableSourceSinkUtil.clear() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala index b71388a960a4b..ad424ce1a37d4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala @@ -105,7 +105,6 @@ class TemporalJoinITCase extends StreamingWithStateTestBase { val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -167,7 +166,6 @@ class TemporalJoinITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build val tEnv = StreamTableEnvironment.create(env, settings) env.setStateBackend(getStateBackend) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala index 4c5dd21fa772c..6917e0ef1286f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala @@ -110,7 +110,6 @@ class GroupWindowITCase extends AbstractTestBase { (16L, 16, "Hello")) val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -171,7 +170,6 @@ class GroupWindowITCase extends AbstractTestBase { @Test def testEventTimeTumblingWindow(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -243,7 +241,6 @@ class GroupWindowITCase extends AbstractTestBase { def testAllEventTimeSlidingGroupWindowOverTime(): Unit = { // please keep this test in sync with the DataSet variant val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -282,7 +279,6 @@ class GroupWindowITCase extends AbstractTestBase { def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = { // please keep this test in sync with the DataSet variant val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -322,7 +318,6 @@ class GroupWindowITCase extends AbstractTestBase { def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = { // please keep this test in sync with the DataSet variant val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -359,7 +354,6 @@ class GroupWindowITCase extends AbstractTestBase { def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = { // please keep this test in sync with the DataSet variant val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -390,7 +384,6 @@ class GroupWindowITCase extends AbstractTestBase { def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = { // please keep this test in sync with the DataSet variant val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -419,7 +412,6 @@ class GroupWindowITCase extends AbstractTestBase { @Test def testEventTimeGroupWindowWithoutExplicitTimeField(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -448,7 +440,6 @@ class GroupWindowITCase extends AbstractTestBase { @Test def testRowbasedAggregateWithEventTimeTumblingWindow(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowTableAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowTableAggregateITCase.scala index 948d27a063d6c..2a0366e7e2372 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowTableAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowTableAggregateITCase.scala @@ -103,7 +103,6 @@ class GroupWindowTableAggregateITCase extends AbstractTestBase { (16L, 16, "Hello")) val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -157,7 +156,6 @@ class GroupWindowTableAggregateITCase extends AbstractTestBase { @Test def testEventTimeTumblingWindow(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -238,7 +236,6 @@ class GroupWindowTableAggregateITCase extends AbstractTestBase { def testAllEventTimeSlidingGroupWindowOverTime(): Unit = { // please keep this test in sync with the DataSet variant val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -287,7 +284,6 @@ class GroupWindowTableAggregateITCase extends AbstractTestBase { def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = { // please keep this test in sync with the DataSet variant val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -333,7 +329,6 @@ class GroupWindowTableAggregateITCase extends AbstractTestBase { def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = { // please keep this test in sync with the DataSet variant val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -375,7 +370,6 @@ class GroupWindowTableAggregateITCase extends AbstractTestBase { def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = { // please keep this test in sync with the DataSet variant val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -409,7 +403,6 @@ class GroupWindowTableAggregateITCase extends AbstractTestBase { def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = { // please keep this test in sync with the DataSet variant val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -440,7 +433,6 @@ class GroupWindowTableAggregateITCase extends AbstractTestBase { @Test def testEventTimeGroupWindowWithoutExplicitTimeField(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala index 04a766b0f88b1..497aaa702741c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala @@ -137,7 +137,6 @@ class OverWindowITCase extends StreamingWithStateTestBase { val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) env.setParallelism(1) @@ -290,7 +289,6 @@ class OverWindowITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -354,7 +352,6 @@ class OverWindowITCase extends StreamingWithStateTestBase { Right(19000L)) val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index 841131dab5e52..701c8426498bf 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -51,7 +51,6 @@ class TableSinkITCase extends AbstractTestBase { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) @@ -95,7 +94,6 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(4) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala index d56b0c900b198..dd69d76683cb4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala @@ -196,7 +196,6 @@ class TableSourceITCase extends AbstractTestBase { def testRowtimeRowTableSource(): Unit = { val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -233,7 +232,6 @@ class TableSourceITCase extends AbstractTestBase { def testProctimeRowTableSource(): Unit = { val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -272,7 +270,6 @@ class TableSourceITCase extends AbstractTestBase { def testRowtimeProctimeRowTableSource(): Unit = { val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -311,7 +308,6 @@ class TableSourceITCase extends AbstractTestBase { def testRowtimeAsTimestampRowTableSource(): Unit = { val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -348,7 +344,6 @@ class TableSourceITCase extends AbstractTestBase { def testRowtimeLongTableSource(): Unit = { val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -378,7 +373,6 @@ class TableSourceITCase extends AbstractTestBase { def testRowtimeStringTableSource(): Unit = { val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -414,7 +408,6 @@ class TableSourceITCase extends AbstractTestBase { def testProctimeStringTableSource(): Unit = { val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -440,7 +433,6 @@ class TableSourceITCase extends AbstractTestBase { def testRowtimeProctimeLongTableSource(): Unit = { val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -473,7 +465,6 @@ class TableSourceITCase extends AbstractTestBase { def testFieldMappingTableSource(): Unit = { val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -509,7 +500,6 @@ class TableSourceITCase extends AbstractTestBase { @Test def testProjectWithoutRowtimeProctime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -547,7 +537,6 @@ class TableSourceITCase extends AbstractTestBase { @Test def testProjectWithoutProctime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -585,7 +574,6 @@ class TableSourceITCase extends AbstractTestBase { @Test def testProjectWithoutRowtime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -623,7 +611,6 @@ class TableSourceITCase extends AbstractTestBase { def testProjectOnlyProctime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -657,7 +644,6 @@ class TableSourceITCase extends AbstractTestBase { def testProjectOnlyRowtime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -695,7 +681,6 @@ class TableSourceITCase extends AbstractTestBase { @Test def testProjectWithMapping(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -734,7 +719,6 @@ class TableSourceITCase extends AbstractTestBase { @Test def testNestedProject(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings) @@ -806,7 +790,6 @@ class TableSourceITCase extends AbstractTestBase { def testRowtimeTableSourcePreserveWatermarks(): Unit = { val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().build() val tEnv = StreamTableEnvironment.create(env, settings)