Skip to content

Commit

Permalink
[FLINK-3121] Emit Final Watermark in Kafka Source
Browse files Browse the repository at this point in the history
Kafka sources that don't read from any partition never emit a watermark,
thereby blocking the progress of event-time in downstream operations.
This changes the Kafka Source to emit a Long.MAX_VALUE watermark if it
knows that it will never receive data.

This also changes the Timestamp Extraction operator to reacto to a
Long.MAX_VALUE watermark by itself emitting a Long.MAX_VALUE watermark.
  • Loading branch information
aljoscha committed Dec 11, 2015
1 parent 4b64887 commit 6bd5714
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
Expand Down Expand Up @@ -434,7 +435,12 @@ public void run(SourceContext<T> sourceContext) throws Exception {
}
}
else {
// this source never completes
// this source never completes, so emit a Long.MAX_VALUE watermark
// to not block watermark forwarding
if (getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) {
sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
}

final Object waitLock = new Object();
while (running) {
// wait until we are canceled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public void run(final Object lockingObject, final Output<StreamRecord<T>> collec
// This will mostly emit a final +Inf Watermark to make the Watermark logic work
// when some sources finish before others do
ctx.close();

if (executionConfig.areTimestampsEnabled()) {
synchronized (lockingObject) {
output.emitWatermark(new Watermark(Long.MAX_VALUE));
}
}
}

public void cancel() {
Expand Down Expand Up @@ -296,14 +302,6 @@ public Object getCheckpointLock() {
}

@Override
public void close() {
// emit one last +Inf watermark to make downstream watermark processing work
// when some sources close early
synchronized (lockingObject) {
if (watermarkMultiplexingEnabled) {
output.emitWatermark(new Watermark(Long.MAX_VALUE));
}
}
}
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ public void open() throws Exception {
currentWatermark = Long.MIN_VALUE;
}

@Override
public void close() throws Exception {
super.close();

// emit a final +Inf watermark, just like the sources
output.emitWatermark(new Watermark(Long.MAX_VALUE));
}

@Override
public void processElement(StreamRecord<T> element) throws Exception {
long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp());
Expand All @@ -90,6 +82,11 @@ public void trigger(long timestamp) throws Exception {

@Override
public void processWatermark(Watermark mark) throws Exception {
// ignore them, since we are basically a watermark source
// if we receive a Long.MAX_VALUE watermark we forward it since it is used
// to signal the end of input and to not block watermark progress downstream
if (mark.getTimestamp() == Long.MAX_VALUE && mark.getTimestamp() > currentWatermark) {
currentWatermark = Long.MAX_VALUE;
output.emitWatermark(mark);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,68 @@ public long getCurrentWatermark() {
}
}

/**
* This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks.
*/
@Test
public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception {
final int NUM_ELEMENTS = 10;

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
env.setParallelism(2);
env.getConfig().disableSysoutLogging();
env.getConfig().enableTimestamps();
env.getConfig().setAutoWatermarkInterval(1);


DataStream<Integer> source1 = env.addSource(new EventTimeSourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
int index = 0;
while (index < NUM_ELEMENTS) {
ctx.collectWithTimestamp(index, index);
ctx.collectWithTimestamp(index - 1, index - 1);
index++;
ctx.emitWatermark(new Watermark(index-2));
}

// emit the final Long.MAX_VALUE watermark, do it twice and verify that
// we only see one in the result
ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
}

@Override
public void cancel() {

}
});

source1.assignTimestamps(new TimestampExtractor<Integer>() {
@Override
public long extractTimestamp(Integer element, long currentTimestamp) {
return element;
}

@Override
public long extractWatermark(Integer element, long currentTimestamp) {
return Long.MIN_VALUE;
}

@Override
public long getCurrentWatermark() {
return Long.MIN_VALUE;
}
})
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));


env.execute();

Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE);
}

/**
* This tests whether the program throws an exception when an event-time source tries
* to emit without timestamp.
Expand Down

0 comments on commit 6bd5714

Please sign in to comment.