Skip to content

Commit

Permalink
[FLINK-2098] Ensure checkpoints and element emission are in order
Browse files Browse the repository at this point in the history
Before, it could happen that a streaming source would keep emitting
elements while a checkpoint is being performed. This can lead to
inconsistencies in the checkpoints with other operators.

This also adds a test that checks whether only one checkpoint is
executed at a time and the serial behaviour of checkpointing and
emission.

This changes the SourceFunction interface to have run()/cancel() methods
where the run() method takes a lock object on which it needs to
synchronize updates to state and emission of elements.
  • Loading branch information
aljoscha committed Jun 4, 2015
1 parent 39010dc commit 6685b0b
Show file tree
Hide file tree
Showing 39 changed files with 1,616 additions and 867 deletions.
19 changes: 7 additions & 12 deletions docs/apis/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,13 @@ parallelism of 1. To create parallel sources the users source function needs to
the parallelism of the environment. The parallelism for ParallelSourceFunctions can be changed
after creation by using `source.setParallelism(parallelism)`.

The `SourceFunction` interface has two methods: `reachedEnd()` and `next()`. The former is used
by the system to determine whether more input data is available. This method can block if there
is no data available right now but there might come more data in the future. The `next()` method
is called to get next data element. This method will only be called if `reachedEnd()` returns
false. This method can also block if no data is currently available but more will arrive in the
future.

The methods must react to thread interrupt calls and break out of blocking calls with
`InterruptedException`. The method may ignore interrupt calls and/or swallow InterruptedExceptions,
if it is guaranteed that the method returns quasi immediately irrespectively of the input.
This is true for example for file streams, where the call is guaranteed to return after a very
short I/O delay in the order of milliseconds.
The `SourceFunction` interface has two methods: `run(SourceContext)` and `cancel()`. The `run()`
method is not expected to return until the source has either finished by itself or received
a cancel request. The source can communicate with the outside world using the source context. For
example, the `emit(element)` method is used to emit one element from the source. Most sources will
have an infinite while loop inside the `run()` method to read from the input and emit elements.
Upon invocation of the `cancel()` method the source is required to break out of its internal
loop and return from the `run()` method.

In addition to the bounded data sources (with similar method signatures as the
[batch API](programming_guide.html#data-sources)) there are several predefined stream sources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.flink.runtime.jobgraph.tasks;

public interface CheckpointedOperator {


/**
* This method is either called directly by the checkpoint coordinator, or called
* when all incoming channels have reported a barrier
*/
void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,22 @@ public static void main(String[] args) {

// data stream with random numbers
DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
private static final long serialVersionUID = 1L;

private volatile boolean isRunning = true;

@Override
public boolean reachedEnd() throws Exception {
return false;
public void run(Object checkpointLock, Collector<String> out) throws Exception {
while (isRunning) {
out.collect(String.valueOf(Math.floor(Math.random() * 100)));
}

}

@Override
public String next() throws Exception {
return String.valueOf(Math.floor(Math.random() * 100));
public void cancel() {
isRunning = false;
}

});
dataStream.write(new HBaseOutputFormat(), 0L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.util.Collector;

public class KafkaProducerExample {

Expand All @@ -39,29 +40,25 @@ public static void main(String[] args) throws Exception {

@SuppressWarnings({ "unused", "serial" })
DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {

private int index = 0;

@Override
public boolean reachedEnd() throws Exception {
return index >= 20;
public void run(Object checkpointLock, Collector<String> collector) throws Exception {
for (int i = 0; i < 20; i++) {
collector.collect("message #" + i);
Thread.sleep(100L);
}

collector.collect("q");
}

@Override
public String next() throws Exception {
if (index < 20) {
String result = "message #" + index;
index++;
return result;
}

return "q";
public void cancel() {
}


}).addSink(
new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema())
)
.setParallelism(3);
.setParallelism(3);

env.execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.ConnectorSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Source that listens to a Kafka topic using the high level Kafka API.
*
*
* @param <OUT>
* Type of the messages on the topic.
*/
Expand All @@ -59,9 +60,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
private static final String DEFAULT_GROUP_ID = "flink-group";

// We must read this in reachedEnd() to check for the end. We keep it to return it in
// next()
private OUT nextElement;
private volatile boolean isRunning = false;

/**
* Creates a KafkaSource that consumes a topic.
Expand All @@ -78,14 +77,15 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
* Synchronization time with zookeeper.
*/
public KafkaSource(String zookeeperAddress,
String topicId, String groupId,
DeserializationSchema<OUT> deserializationSchema,
long zookeeperSyncTimeMillis) {
String topicId,
String groupId,
DeserializationSchema<OUT> deserializationSchema,
long zookeeperSyncTimeMillis) {
this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null);
}
/**
* Creates a KafkaSource that consumes a topic.
*
*
* @param zookeeperAddress
* Address of the Zookeeper host (with port number).
* @param topicId
Expand All @@ -100,9 +100,9 @@ public KafkaSource(String zookeeperAddress,
* Custom properties for Kafka
*/
public KafkaSource(String zookeeperAddress,
String topicId, String groupId,
DeserializationSchema<OUT> deserializationSchema,
long zookeeperSyncTimeMillis, Properties customProperties) {
String topicId, String groupId,
DeserializationSchema<OUT> deserializationSchema,
long zookeeperSyncTimeMillis, Properties customProperties) {
super(deserializationSchema);
Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
Preconditions.checkNotNull(topicId, "Topic ID is null");
Expand Down Expand Up @@ -178,42 +178,31 @@ private void initializeConnection() {
}

@Override
public void open(Configuration config) throws Exception {
super.open(config);
initializeConnection();
}

@Override
public void close() throws Exception {
super.close();
if (consumer != null) {
public void run(Object checkpointLock, Collector<OUT> collector) throws Exception {
isRunning = true;
try {
while (isRunning && consumerIterator.hasNext()) {
OUT out = schema.deserialize(consumerIterator.next().message());
if (schema.isEndOfStream(out)) {
break;
}
collector.collect(out);
}
} finally {
consumer.shutdown();
}
}

@Override
public boolean reachedEnd() throws Exception {
if (nextElement != null) {
return false;
} else if (consumerIterator.hasNext()) {
OUT out = schema.deserialize(consumerIterator.next().message());
if (schema.isEndOfStream(out)) {
return true;
}
nextElement = out;
}
return false;
public void open(Configuration config) throws Exception {
initializeConnection();
}

@Override
public OUT next() throws Exception {
if (!reachedEnd()) {
OUT result = nextElement;
nextElement = null;
return result;
} else {
throw new RuntimeException("Source exhausted");
public void cancel() {
isRunning = false;
if (consumer != null) {
consumer.shutdown();
}
}

}
Loading

0 comments on commit 6685b0b

Please sign in to comment.