Skip to content

Commit

Permalink
[FLINK-19317] Remove unnecessary calls to setStreamTimeCharacteristic…
Browse files Browse the repository at this point in the history
… (scala)

I'm just removing calls the set EventTime because that's the new default
now.

I'm also removing most calls to set ProcessingTime because it's not
needed for making processing-time timers/windows work. I only left it
for some tests that check specific failure behavior.

I removed calls to set IngestionTime and replaced them by an explicit
IngestionTimeWatermarkStrategy. I duplicated the same
IngestionTimeWatermarkStrategy in all the examples/tests because I
explicitly didn't want to add an IngestionTimeWatermarkStrategy in one
of the core packages so that it is not discoverable because I think we
shouldn't encourage users to use ingestion time.
  • Loading branch information
aljoscha committed Oct 1, 2020
1 parent cb4de07 commit c214651
Show file tree
Hide file tree
Showing 31 changed files with 0 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.state.{AggregatingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.operators.OneInputStreamOperator
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction}
import org.apache.flink.streaming.api.transformations.OneInputTransformation
Expand Down Expand Up @@ -69,8 +68,6 @@ class AllWindowTranslationTest {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.fromElements(("hello", 1), ("hello", 2))

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

source
.windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.reduce(new RichReduceFunction[(String, Int)] {
Expand All @@ -89,8 +86,6 @@ class AllWindowTranslationTest {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.fromElements(("hello", 1), ("hello", 2))

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

source
.windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.aggregate(new DummyRichAggregator())
Expand Down Expand Up @@ -133,7 +128,6 @@ class AllWindowTranslationTest {
@Test
def testMergingWindowsWithEvictor() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -172,7 +166,6 @@ class AllWindowTranslationTest {
@Test
def testReduceEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -205,7 +198,6 @@ class AllWindowTranslationTest {
@Test
def testReduceProcessingTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -238,7 +230,6 @@ class AllWindowTranslationTest {
@Test
def testReduceEventTimeWithScalaFunction() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -271,7 +262,6 @@ class AllWindowTranslationTest {
@Test
def testReduceWithWindowFunctionEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -311,7 +301,6 @@ class AllWindowTranslationTest {
@Test
def testReduceWithWindowFunctionProcessingTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -350,7 +339,6 @@ class AllWindowTranslationTest {
@Test
def testReduceWithProcessWindowFunctionEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -390,7 +378,6 @@ class AllWindowTranslationTest {
@Test
def testReduceWithProcessWindowFunctionProcessingTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -429,7 +416,6 @@ class AllWindowTranslationTest {
@Test
def testApplyWithPreReducerEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -468,7 +454,6 @@ class AllWindowTranslationTest {
@Test
def testReduceWithWindowFunctionEventTimeWithScalaFunction() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -510,7 +495,6 @@ class AllWindowTranslationTest {
@Test
def testAggregateEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -543,7 +527,6 @@ class AllWindowTranslationTest {
@Test
def testAggregateProcessingTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -576,7 +559,6 @@ class AllWindowTranslationTest {
@Test
def testAggregateWithWindowFunctionEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -609,7 +591,6 @@ class AllWindowTranslationTest {
@Test
def testAggregateWithWindowFunctionProcessingTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -642,7 +623,6 @@ class AllWindowTranslationTest {
@Test
def testAggregateWithProcessWindowFunctionEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -675,7 +655,6 @@ class AllWindowTranslationTest {
@Test
def testAggregateWithProcessWindowFunctionProcessingTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -708,7 +687,6 @@ class AllWindowTranslationTest {
@Test
def testAggregateWithWindowFunctionEventTimeWithScalaFunction() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -749,7 +727,6 @@ class AllWindowTranslationTest {
@Test
def testApplyEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -788,7 +765,6 @@ class AllWindowTranslationTest {
@Test
def testApplyProcessingTimeTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -827,7 +803,6 @@ class AllWindowTranslationTest {
@Test
def testProcessEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -866,7 +841,6 @@ class AllWindowTranslationTest {
@Test
def testProcessProcessingTimeTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -905,7 +879,6 @@ class AllWindowTranslationTest {
@Test
def testApplyEventTimeWithScalaFunction() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -941,7 +914,6 @@ class AllWindowTranslationTest {
@Test
def testReduceWithCustomTrigger() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -975,7 +947,6 @@ class AllWindowTranslationTest {
@Test
def testApplyWithCustomTrigger() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -1015,7 +986,6 @@ class AllWindowTranslationTest {
@Test
def testProcessWithCustomTrigger() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -1055,7 +1025,6 @@ class AllWindowTranslationTest {
@Test
def testReduceWithEvictor() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -1091,7 +1060,6 @@ class AllWindowTranslationTest {
@Test
def testApplyWithEvictor() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down Expand Up @@ -1132,7 +1100,6 @@ class AllWindowTranslationTest {
@Test
def testProcessWithEvictor() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val source = env.fromElements(("hello", 1), ("hello", 2))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.scala

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
Expand Down Expand Up @@ -55,7 +54,6 @@ class BroadcastStateITCase extends AbstractTestBase {
5L -> "test:5")

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val srcOne = env
.generateSequence(0L, 5L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.scala

import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
Expand All @@ -40,7 +39,6 @@ class CoGroupJoinITCase extends AbstractTestBase {
CoGroupJoinITCase.testResults = mutable.MutableList()

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val source1 = env.addSource(new SourceFunction[(String, Int)]() {
Expand Down Expand Up @@ -107,7 +105,6 @@ class CoGroupJoinITCase extends AbstractTestBase {
CoGroupJoinITCase.testResults = mutable.MutableList()

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
Expand Down Expand Up @@ -187,7 +184,6 @@ class CoGroupJoinITCase extends AbstractTestBase {
CoGroupJoinITCase.testResults = mutable.MutableList()

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.scala

import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
Expand All @@ -36,7 +35,6 @@ class IntervalJoinITCase extends AbstractTestBase {
@Test
def testInclusiveBounds(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val dataStream1 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L))
Expand Down Expand Up @@ -74,7 +72,6 @@ class IntervalJoinITCase extends AbstractTestBase {
@Test
def testExclusiveBounds(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val dataStream1 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.scala

import java.util

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, ProcessFunction}
import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction, ProcessWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
Expand Down Expand Up @@ -154,7 +153,6 @@ class SideOutputITCase extends AbstractTestBase {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4))

Expand Down Expand Up @@ -198,7 +196,6 @@ class SideOutputITCase extends AbstractTestBase {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4))

Expand Down Expand Up @@ -244,7 +241,6 @@ class SideOutputITCase extends AbstractTestBase {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4))

Expand Down Expand Up @@ -291,7 +287,6 @@ class SideOutputITCase extends AbstractTestBase {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4))

Expand Down
Loading

0 comments on commit c214651

Please sign in to comment.