Skip to content

Commit

Permalink
[FLINK-19319] Deprecate setStreamTimeCharacteristic() and TimeCharact…
Browse files Browse the repository at this point in the history
…eristic

After FLINK-19317 and FLINK-19318 we don't need this setting anymore.
Using (explicit) processing-time windows and processing-time timers work
fine in a program that has EventTime set as a time characteristic and
once we deprecate timeWindow() there are not other operations that
change behaviour depending on the time characteristic so there's no need
to ever change from the new default of event-time. Similarly, the
IngestionTime setting can be achieved in the future by providing an
ingestion-time WatermarkStrategy.
  • Loading branch information
aljoscha committed Oct 1, 2020
1 parent 5241b4b commit f8cc82b
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ public ClosureCleanerLevel getClosureCleanerLevel() {
* the streaming system to keep track of the progress of time. They are used, for example,
* for time based windowing.
*
* <p>Setting an interval of {@code 0} will disable periodic watermark emission.
*
* @param interval The interval between watermarks in milliseconds.
*/
@PublicEvolving
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,26 @@
package org.apache.flink.streaming.api;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;

/**
* The time characteristic defines how the system determines time for time-dependent
* order and operations that depend on time (such as time windows).
* The time characteristic defines how the system determines time for time-dependent order and
* operations that depend on time (such as time windows).
*
* @deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link
* TimeCharacteristic#EventTime}, thus you don't need to call this method for enabling
* event-time support anymore. Explicitly using processing-time windows and timers works in
* event-time mode. If you need to disable watermarks, please use {@link
* ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link
* TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link
* WatermarkStrategy}. If you are using generic "time window" operations (for example {@link
* org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)}
* that change behaviour based on the time characteristic, please use equivalent operations
* that explicitly specify processing time or event time.
*/
@PublicEvolving
@Deprecated
public enum TimeCharacteristic {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,12 +722,24 @@ public void registerType(Class<?> type) {
* time, event time, or ingestion time.
*
* <p>If you set the characteristic to IngestionTime of EventTime this will set a default
* watermark update interval of 200 ms. If this is not applicable for your application
* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
* watermark update interval of 200 ms. If this is not applicable for your application you
* should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* @param characteristic The time characteristic.
*
* @deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link
* TimeCharacteristic#EventTime}, thus you don't need to call this method for enabling
* event-time support anymore. Explicitly using processing-time windows and timers works in
* event-time mode. If you need to disable watermarks, please use {@link
* ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link
* TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link
* WatermarkStrategy}. If you are using generic "time window" operations (for example {@link
* org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)}
* that change behaviour based on the time characteristic, please use equivalent operations
* that explicitly specify processing time or event time.
*/
@PublicEvolving
@Deprecated
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
Expand All @@ -740,11 +752,11 @@ public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
/**
* Gets the time characteristic.
*
* @see #setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
*
* @return The time characteristic.
* @deprecated See {@link #setStreamTimeCharacteristic(TimeCharacteristic)} for deprecation
* notice.
*/
@PublicEvolving
@Deprecated
public TimeCharacteristic getStreamTimeCharacteristic() {
return timeCharacteristic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.scala

import com.esotericsoftware.kryo.Serializer
import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving}
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
Expand All @@ -36,8 +37,6 @@ import org.apache.flink.streaming.api.functions.source._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.SplittableIterator

import com.esotericsoftware.kryo.Serializer

import _root_.scala.language.implicitConversions
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -389,7 +388,18 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]]
*
* @param characteristic The time characteristic.
* @deprecated In Flink 1.12 the default stream time characteristic has been changed to
* [[TimeCharacteristic.EventTime]], thus you don't need to call this method for
* enabling event-time support anymore. Explicitly using processing-time windows and
* timers works in event-time mode. If you need to disable watermarks, please use
* [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long]]. If
* you are using [[TimeCharacteristic.IngestionTime]], please manually set an
* appropriate [[WatermarkStrategy]]. If you are using generic "time window"
* operations (for example [[KeyedStream.timeWindow()]] that change behaviour based
* on the time characteristic, please use equivalent operations that explicitly
* specify processing time or event time.
*/
@deprecated
@PublicEvolving
def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = {
javaEnv.setStreamTimeCharacteristic(characteristic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// for TimeCharacteristic validation in StreamTableEnvironmentImpl
env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
if (environment.getExecution().getTimeCharacteristic() == TimeCharacteristic.EventTime) {
env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
}
return env;
Expand Down

0 comments on commit f8cc82b

Please sign in to comment.