Skip to content

Commit

Permalink
[FLINK-19317] Make EventTime the default TimeCharacteristic
Browse files Browse the repository at this point in the history
This is part of the FLIP-134 (Batch execution for the DataStream API)
work.

Event time is the only sensible time characteristic for batch
processing. We therefore change the default value of the
TimeCharacteristic from ProcessingTime to EventTime. This means the
DataStream API programs that were using event time before now just work
without manually changing this setting. Processing-time programs will
also still work, because using processing-time timers is not dependent
on the TimeCharacteristic. DataStream programs that don't set a
TimestampAssigner or WatermarkStrategy will also still work if they
don't use operations that don't rely on (event-time) timestamps.  This
is true for both BATCH and STREAMING execution mode.

With this change, users don't need to call
setStreamTimeCharacteristic(EventTime) anymore. We will make sure they
learn of this by deprecating the method in a follow-up commit.

The only real user-visible change of this is that programs that used the
KeyedStream.timeWindow()/DataStream.timeWindow() operation, which is
dependent on the TimeCharacteristic will now use event time by default.
We don't think this operation is useful because the behaviour can be
surprising. We recommend users always use an explicit processing-time
window or event-time window.

We also change the default watermark interval from 0 (disabled) to 200
to match the previous behaviour of calling
setStreamTimeCharacteristic(EventTime).
  • Loading branch information
aljoscha committed Oct 1, 2020
1 parent 6e5240f commit 1175364
Show file tree
Hide file tree
Showing 28 changed files with 285 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
private boolean autoTypeRegistrationEnabled = true;

private boolean forceAvro = false;
private long autoWatermarkInterval = 0;
private long autoWatermarkInterval = 200;

/**
* Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
Expand All @@ -58,7 +58,6 @@ public static void main(String[] args) throws Exception {
int offset = params.getInt("offsetInSecond", 0);

StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
sEnv.enableCheckpointing(4000);
sEnv.getConfig().setAutoWatermarkInterval(1000);

Expand All @@ -69,7 +68,7 @@ public static void main(String[] args) throws Exception {

DataStream<Tuple> result = rows
.keyBy(1)
.timeWindow(Time.seconds(5))
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(0);

result.writeAsText(outputPath + "/result.txt", FileSystem.WriteMode.OVERWRITE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
Expand All @@ -45,6 +44,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.tests.artificialstate.ArtificalOperatorStateMapper;
Expand Down Expand Up @@ -236,8 +236,6 @@ public static void setupEnvironment(StreamExecutionEnvironment env, ParameterToo
setupRestartStrategy(env, pt);
setupStateBackend(env, pt);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(pt);
}
Expand Down Expand Up @@ -378,14 +376,15 @@ static WindowedStream<Event, Integer, TimeWindow> applyTumblingWindows(
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue());

return keyedStream.timeWindow(
Time.milliseconds(
pt.getLong(
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.key(),
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.defaultValue()
) * eventTimeProgressPerEvent
)
);
return keyedStream.window(
TumblingEventTimeWindows.of(
Time.milliseconds(
pt.getLong(
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.key(),
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.defaultValue()
) * eventTimeProgressPerEvent
)
));
}

static FlatMapFunction<Event, String> createSemanticsCheckMapper(ParameterTool pt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
package org.apache.flink.streaming.examples.ml;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

/**
* Skeleton for incremental machine learning algorithm consisting of a
* pre-computed model, which gets updated for the new inputs and new input data
Expand Down Expand Up @@ -61,15 +59,14 @@ public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Integer> trainingData = env.addSource(new FiniteTrainingDataSource());
DataStream<Integer> newData = env.addSource(new FiniteNewDataSource());

// build new model on every second of new data
DataStream<Double[]> model = trainingData
.assignTimestampsAndWatermarks(new LinearTimestamp())
.timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(5000)))
.apply(new PartialModelBuilder());

// use partial model for newData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -77,7 +78,7 @@ public void flatMap(String value, Collector<WordWithCount> out) {
})

.keyBy(value -> value.word)
.timeWindow(Time.seconds(5))
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

.reduce(new ReduceFunction<WordWithCount>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* An example of grouped stream windowing into sliding time windows.
* This example uses [[RichParallelSourceFunction]] to generate a list of key-value pairs.
Expand All @@ -48,12 +47,12 @@ public static void main(String[] args) throws Exception {

stream
.keyBy(value -> value.f0)
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
.window(SlidingProcessingTimeWindows.of(Time.milliseconds(2500), Time.milliseconds(500)))
.reduce(new SummingReducer())

// alternative: use a apply function which does not pre-aggregate
// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
// .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
// .window(SlidingProcessingTimeWindows.of(Time.milliseconds(2500), Time.milliseconds(500)))
// .apply(new SummingWindowFunction())

.addSink(new SinkFunction<Tuple2<Long, Long>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@

package org.apache.flink.streaming.scala.examples.ml

import java.util.concurrent.TimeUnit

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
Expand Down Expand Up @@ -62,19 +60,18 @@ object IncrementalLearningSkeleton {

// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// build new model on every second of new data
val trainingData: DataStream[Int] = env.addSource(new FiniteTrainingDataSource)
val newData: DataStream[Int] = env.addSource(new FiniteNewDataSource)
val trainingData: DataStream[Integer] = env.addSource(new FiniteTrainingDataSource)
val newData: DataStream[Integer] = env.addSource(new FiniteNewDataSource)

val model: DataStream[Array[Double]] = trainingData
val model: DataStream[Array[java.lang.Double]] = trainingData
.assignTimestampsAndWatermarks(new LinearTimestamp)
.timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(5000)))
.apply(new PartialModelBuilder)

// use partial model for newData
val prediction: DataStream[Int] = newData.connect(model).map(new Predictor)
val prediction: DataStream[Integer] = newData.connect(model).map(new Predictor)

// emit result
if (params.has("output")) {
Expand All @@ -96,8 +93,8 @@ object IncrementalLearningSkeleton {
* Feeds new data for newData. By default it is implemented as constantly
* emitting the Integer 1 in a loop.
*/
private class FiniteNewDataSource extends SourceFunction[Int] {
override def run(ctx: SourceContext[Int]) = {
private class FiniteNewDataSource extends SourceFunction[Integer] {
override def run(ctx: SourceContext[Integer]) = {
Thread.sleep(15)
(0 until 50).foreach{ _ =>
Thread.sleep(5)
Expand All @@ -114,37 +111,40 @@ object IncrementalLearningSkeleton {
* Feeds new training data for the partial model builder. By default it is
* implemented as constantly emitting the Integer 1 in a loop.
*/
private class FiniteTrainingDataSource extends SourceFunction[Int] {
override def run(ctx: SourceContext[Int]) = (0 until 8200).foreach( _ => ctx.collect(1) )
private class FiniteTrainingDataSource extends SourceFunction[Integer] {
override def run(ctx: SourceContext[Integer]) =
(0 until 8200).foreach( _ => ctx.collect(1) )

override def cancel() = {
// No cleanup needed
}
}

private class LinearTimestamp extends AssignerWithPunctuatedWatermarks[Int] {
private class LinearTimestamp extends AssignerWithPunctuatedWatermarks[Integer] {
var counter = 0L

override def extractTimestamp(element: Int, previousElementTimestamp: Long): Long = {
override def extractTimestamp(element: Integer, previousElementTimestamp: Long): Long = {
counter += 10L
counter
}

override def checkAndGetNextWatermark(lastElement: Int, extractedTimestamp: Long) = {
override def checkAndGetNextWatermark(lastElement: Integer, extractedTimestamp: Long) = {
new Watermark(counter - 1)
}
}

/**
* Builds up-to-date partial models on new training data.
*/
private class PartialModelBuilder extends AllWindowFunction[Int, Array[Double], TimeWindow] {
private class PartialModelBuilder
extends AllWindowFunction[Integer, Array[java.lang.Double], TimeWindow] {

protected def buildPartialModel(values: Iterable[Int]): Array[Double] = Array[Double](1)
protected def buildPartialModel(values: Iterable[Integer]): Array[java.lang.Double] =
Array[java.lang.Double](1)

override def apply(window: TimeWindow,
values: Iterable[Int],
out: Collector[Array[Double]]): Unit = {
values: Iterable[Integer],
out: Collector[Array[java.lang.Double]]): Unit = {
out.collect(buildPartialModel(values))
}
}
Expand All @@ -157,25 +157,25 @@ object IncrementalLearningSkeleton {
* for every model update.
*
*/
private class Predictor extends CoMapFunction[Int, Array[Double], Int] {
private class Predictor extends CoMapFunction[Integer, Array[java.lang.Double], Integer] {

var batchModel: Array[Double] = null
var partialModel: Array[Double] = null
var batchModel: Array[java.lang.Double] = null
var partialModel: Array[java.lang.Double] = null

override def map1(value: Int): Int = {
override def map1(value: Integer): Integer = {
// Return newData
predict(value)
}

override def map2(value: Array[Double]): Int = {
override def map2(value: Array[java.lang.Double]): Integer = {
// Update model
partialModel = value
batchModel = getBatchModel()
1
}

// pulls model built with batch-job on the old training data
protected def getBatchModel(): Array[Double] = Array[Double](0)
protected def getBatchModel(): Array[java.lang.Double] = Array[java.lang.Double](0)

// performs newData using the two models
protected def predict(inTuple: Int): Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.streaming.scala.examples.socket

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
Expand Down Expand Up @@ -68,7 +69,7 @@ object SocketWindowWordCount {
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy(_.word)
.timeWindow(Time.seconds(5))
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum("count")

// print the results with a single thread, rather than in parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

package org.apache.flink.streaming.scala.examples.windowing

import java.util.concurrent.TimeUnit.MILLISECONDS

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
Expand All @@ -42,7 +41,7 @@ object GroupedProcessingTimeWindowExample {

stream
.keyBy(_._1)
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
.window(SlidingProcessingTimeWindows.of(Time.milliseconds(2500), Time.milliseconds(500)))
.reduce((value1, value2) => (value1._1, value1._2 + value2._2))
.addSink(new SinkFunction[(Long, Long)]() {
override def invoke(in: (Long, Long)): Unit = {}
Expand Down
Loading

0 comments on commit 1175364

Please sign in to comment.