Skip to content

Commit

Permalink
[FLINK-19317] Remove unnecessary calls to setStreamTimeCharacteristic…
Browse files Browse the repository at this point in the history
… (java)

I'm just removing calls the set EventTime because that's the new default
now.

I'm also removing most calls to set ProcessingTime because it's not
needed for making processing-time timers/windows work. I only left it
for some tests that check specific failure behavior.

I removed calls to set IngestionTime and replaced them by an explicit
IngestionTimeWatermarkStrategy. I duplicated the same
IngestionTimeWatermarkStrategy in all the examples/tests because I
explicitly didn't want to add an IngestionTimeWatermarkStrategy in one
of the core packages so that it is not discoverable because I think we
shouldn't encourage users to use ingestion time.
  • Loading branch information
aljoscha committed Oct 1, 2020
1 parent 1175364 commit cb4de07
Show file tree
Hide file tree
Showing 30 changed files with 115 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
Expand Down Expand Up @@ -147,7 +146,6 @@ public static DataStream<Tuple4<Integer, Long, String, Timestamp>> get4TupleData
public void testReal() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
Expand All @@ -171,7 +169,6 @@ public void testReal() throws Exception {
public void testUpsert() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
Expand Down Expand Up @@ -131,7 +130,6 @@ public static DataStream<Tuple4<Integer, Long, String, Timestamp>> get4TupleData
public void testReal() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.executeSql(
Expand All @@ -151,7 +149,6 @@ public void testReal() throws Exception {
public void testUpsert() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table t = tEnv.fromDataStream(get4TupleDataStream(env).assignTimestampsAndWatermarks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down Expand Up @@ -1548,7 +1547,6 @@ public void runCollectingSchemaTest() throws Exception {

// read using custom schema
final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
env1.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env1.setParallelism(1);
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
Expand Down Expand Up @@ -199,7 +198,6 @@ public void testTimestamps() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() {
private static final long serialVersionUID = -2255115836471289626L;
Expand Down Expand Up @@ -242,7 +240,6 @@ public int partition(Long next, byte[] key, byte[] value, String targetTopic, in
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

FlinkKafkaConsumer<Long> kafkaSource = new FlinkKafkaConsumer<>(topic, new KafkaITCase.LimitedLongDeserializer(), standardProps);
kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -103,7 +102,6 @@ public static void main(String[] args) throws Exception {
3,
Time.of(10, TimeUnit.SECONDS)
));
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
sEnv.enableCheckpointing(4000);
sEnv.getConfig().setAutoWatermarkInterval(1000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
Expand All @@ -45,7 +44,6 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

return env;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -58,8 +57,6 @@ public class AsyncIOExample {
private static final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class);

private static final String EXACTLY_ONCE_MODE = "exactly_once";
private static final String EVENT_TIME = "EventTime";
private static final String INGESTION_TIME = "IngestionTime";
private static final String ORDERED = "ordered";

/**
Expand Down Expand Up @@ -210,7 +207,6 @@ public static void main(String[] args) throws Exception {
final float failRatio;
final String mode;
final int taskNum;
final String timeType;
final long shutdownWaitTS;
final long timeout;

Expand All @@ -223,7 +219,6 @@ public static void main(String[] args) throws Exception {
failRatio = params.getFloat("failRatio", 0.001f);
mode = params.get("waitMode", "ordered");
taskNum = params.getInt("waitOperatorParallelism", 1);
timeType = params.get("eventType", "EventTime");
shutdownWaitTS = params.getLong("shutdownWaitTS", 20000);
timeout = params.getLong("timeout", 10000L);
} catch (Exception e) {
Expand All @@ -245,7 +240,6 @@ public static void main(String[] args) throws Exception {
.append("Fail ratio=").append(failRatio).append(lineSeparator)
.append("Waiting mode=").append(mode).append(lineSeparator)
.append("Parallelism for async wait operator=").append(taskNum).append(lineSeparator)
.append("Event type=").append(timeType).append(lineSeparator)
.append("Shutdown wait timestamp=").append(shutdownWaitTS);

LOG.info(configStringBuilder.toString());
Expand All @@ -262,14 +256,6 @@ public static void main(String[] args) throws Exception {
env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
}

// enable watermark or not
if (EVENT_TIME.equals(timeType)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
}
else if (INGESTION_TIME.equals(timeType)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
}

// create input stream of a single integer
DataStream<Integer> inputStream = env.addSource(new SimpleSource(maxCount));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package org.apache.flink.streaming.examples.join;

import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
Expand Down Expand Up @@ -57,14 +62,18 @@ public static void main(String[] args) throws Exception {

// obtain execution environment, run this example in "ingestion time"
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);

// create the data sources for both grades and salaries
DataStream<Tuple2<String, Integer>> grades = GradeSource.getSource(env, rate);
DataStream<Tuple2<String, Integer>> salaries = SalarySource.getSource(env, rate);
DataStream<Tuple2<String, Integer>> grades = GradeSource
.getSource(env, rate)
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());

DataStream<Tuple2<String, Integer>> salaries = SalarySource
.getSource(env, rate)
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());

// run the actual window join program
// for testability, this functionality is in a separate method.
Expand Down Expand Up @@ -103,4 +112,29 @@ public String getKey(Tuple2<String, Integer> value) {
return value.f0;
}
}

/**
* This {@link WatermarkStrategy} assigns the current system time as the event-time timestamp.
* In a real use case you should use proper timestamps and an appropriate {@link
* WatermarkStrategy}.
*/
private static class IngestionTimeWatermarkStrategy<T> implements WatermarkStrategy<T> {

private IngestionTimeWatermarkStrategy() {
}

public static <T> IngestionTimeWatermarkStrategy<T> create() {
return new IngestionTimeWatermarkStrategy<>();
}

@Override
public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new AscendingTimestampsWatermarks<>();
}

@Override
public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (event, timestamp) -> System.currentTimeMillis();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

package org.apache.flink.streaming.examples.sideoutput;

import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -55,8 +60,6 @@ public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);

Expand All @@ -72,6 +75,11 @@ public static void main(String[] args) throws Exception {
text = env.fromElements(WordCountData.WORDS);
}

// We assign the WatermarkStrategy after creating the source. In a real-world job you
// should integrate the WatermarkStrategy in the source. The Kafka source allows this,
// for example.
text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());

SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = text
.keyBy(new KeySelector<String, Integer>() {
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -148,4 +156,29 @@ public void processElement(

}
}

/**
* This {@link WatermarkStrategy} assigns the current system time as the event-time timestamp.
* In a real use case you should use proper timestamps and an appropriate {@link
* WatermarkStrategy}.
*/
private static class IngestionTimeWatermarkStrategy<T> implements WatermarkStrategy<T> {

private IngestionTimeWatermarkStrategy() {
}

public static <T> IngestionTimeWatermarkStrategy<T> create() {
return new IngestionTimeWatermarkStrategy<>();
}

@Override
public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new AscendingTimestampsWatermarks<>();
}

@Override
public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (event, timestamp) -> System.currentTimeMillis();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand All @@ -43,7 +42,6 @@ public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.getConfig().setGlobalJobParameters(params);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

final boolean fileOutput = params.has("output");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand Down Expand Up @@ -53,7 +52,6 @@ public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setGlobalJobParameters(params);

@SuppressWarnings({"rawtypes", "serial"})
Expand Down
Loading

0 comments on commit cb4de07

Please sign in to comment.