diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index 49dc06898e8d8..4c7178ea8d8a2 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -309,18 +309,13 @@ parallelism of 1. To create parallel sources the users source function needs to the parallelism of the environment. The parallelism for ParallelSourceFunctions can be changed after creation by using `source.setParallelism(parallelism)`. -The `SourceFunction` interface has two methods: `reachedEnd()` and `next()`. The former is used -by the system to determine whether more input data is available. This method can block if there -is no data available right now but there might come more data in the future. The `next()` method -is called to get next data element. This method will only be called if `reachedEnd()` returns -false. This method can also block if no data is currently available but more will arrive in the -future. - -The methods must react to thread interrupt calls and break out of blocking calls with -`InterruptedException`. The method may ignore interrupt calls and/or swallow InterruptedExceptions, -if it is guaranteed that the method returns quasi immediately irrespectively of the input. -This is true for example for file streams, where the call is guaranteed to return after a very -short I/O delay in the order of milliseconds. +The `SourceFunction` interface has two methods: `run(SourceContext)` and `cancel()`. The `run()` +method is not expected to return until the source has either finished by itself or received +a cancel request. The source can communicate with the outside world using the source context. For +example, the `emit(element)` method is used to emit one element from the source. Most sources will +have an infinite while loop inside the `run()` method to read from the input and emit elements. +Upon invocation of the `cancel()` method the source is required to break out of its internal +loop and return from the `run()` method. In addition to the bounded data sources (with similar method signatures as the [batch API](programming_guide.html#data-sources)) there are several predefined stream sources diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java index 17ba9477bf049..84072e15dc6ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java @@ -19,6 +19,10 @@ package org.apache.flink.runtime.jobgraph.tasks; public interface CheckpointedOperator { - + + /** + * This method is either called directly by the checkpoint coordinator, or called + * when all incoming channels have reported a barrier + */ void triggerCheckpoint(long checkpointId, long timestamp) throws Exception; } diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java index 1e113c07938ff..74c6c57bb45c0 100644 --- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java +++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java @@ -48,17 +48,22 @@ public static void main(String[] args) { // data stream with random numbers DataStream dataStream = env.addSource(new SourceFunction() { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; @Override - public boolean reachedEnd() throws Exception { - return false; + public void run(Object checkpointLock, Collector out) throws Exception { + while (isRunning) { + out.collect(String.valueOf(Math.floor(Math.random() * 100))); + } + } @Override - public String next() throws Exception { - return String.valueOf(Math.floor(Math.random() * 100)); + public void cancel() { + isRunning = false; } - }); dataStream.write(new HBaseOutputFormat(), 0L); diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java index 4dd5577fb6595..a29e93757700a 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.api.KafkaSink; import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema; +import org.apache.flink.util.Collector; public class KafkaProducerExample { @@ -39,29 +40,25 @@ public static void main(String[] args) throws Exception { @SuppressWarnings({ "unused", "serial" }) DataStream stream1 = env.addSource(new SourceFunction() { - - private int index = 0; - @Override - public boolean reachedEnd() throws Exception { - return index >= 20; + public void run(Object checkpointLock, Collector collector) throws Exception { + for (int i = 0; i < 20; i++) { + collector.collect("message #" + i); + Thread.sleep(100L); + } + + collector.collect("q"); } @Override - public String next() throws Exception { - if (index < 20) { - String result = "message #" + index; - index++; - return result; - } - - return "q"; + public void cancel() { } + }).addSink( new KafkaSink(host + ":" + port, topic, new JavaDefaultStringSchema()) ) - .setParallelism(3); + .setParallelism(3); env.execute(); } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java index 4a7ec15d0532c..2fa2c26f8e1a3 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java @@ -32,12 +32,13 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.ConnectorSource; import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Source that listens to a Kafka topic using the high level Kafka API. - * + * * @param * Type of the messages on the topic. */ @@ -59,9 +60,7 @@ public class KafkaSource extends ConnectorSource { private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200; private static final String DEFAULT_GROUP_ID = "flink-group"; - // We must read this in reachedEnd() to check for the end. We keep it to return it in - // next() - private OUT nextElement; + private volatile boolean isRunning = false; /** * Creates a KafkaSource that consumes a topic. @@ -78,14 +77,15 @@ public class KafkaSource extends ConnectorSource { * Synchronization time with zookeeper. */ public KafkaSource(String zookeeperAddress, - String topicId, String groupId, - DeserializationSchema deserializationSchema, - long zookeeperSyncTimeMillis) { + String topicId, + String groupId, + DeserializationSchema deserializationSchema, + long zookeeperSyncTimeMillis) { this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null); } /** * Creates a KafkaSource that consumes a topic. - * + * * @param zookeeperAddress * Address of the Zookeeper host (with port number). * @param topicId @@ -100,9 +100,9 @@ public KafkaSource(String zookeeperAddress, * Custom properties for Kafka */ public KafkaSource(String zookeeperAddress, - String topicId, String groupId, - DeserializationSchema deserializationSchema, - long zookeeperSyncTimeMillis, Properties customProperties) { + String topicId, String groupId, + DeserializationSchema deserializationSchema, + long zookeeperSyncTimeMillis, Properties customProperties) { super(deserializationSchema); Preconditions.checkNotNull(zookeeperAddress, "ZK address is null"); Preconditions.checkNotNull(topicId, "Topic ID is null"); @@ -178,42 +178,31 @@ private void initializeConnection() { } @Override - public void open(Configuration config) throws Exception { - super.open(config); - initializeConnection(); - } - - @Override - public void close() throws Exception { - super.close(); - if (consumer != null) { + public void run(Object checkpointLock, Collector collector) throws Exception { + isRunning = true; + try { + while (isRunning && consumerIterator.hasNext()) { + OUT out = schema.deserialize(consumerIterator.next().message()); + if (schema.isEndOfStream(out)) { + break; + } + collector.collect(out); + } + } finally { consumer.shutdown(); } } @Override - public boolean reachedEnd() throws Exception { - if (nextElement != null) { - return false; - } else if (consumerIterator.hasNext()) { - OUT out = schema.deserialize(consumerIterator.next().message()); - if (schema.isEndOfStream(out)) { - return true; - } - nextElement = out; - } - return false; + public void open(Configuration config) throws Exception { + initializeConnection(); } @Override - public OUT next() throws Exception { - if (!reachedEnd()) { - OUT result = nextElement; - nextElement = null; - return result; - } else { - throw new RuntimeException("Source exhausted"); + public void cancel() { + isRunning = false; + if (consumer != null) { + consumer.shutdown(); } } - } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 51682abbb7dbb..d695b0912f939 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -70,7 +70,6 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema; import org.apache.flink.util.Collector; -import org.apache.flink.util.StringUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -316,9 +315,9 @@ public void testPersistentSourceWithOffsetUpdates() throws Exception { Assert.assertTrue("The offset seems incorrect, got " + o2, o2 > 50L); Assert.assertTrue("The offset seems incorrect, got " + o3, o3 > 50L); /** Once we have proper shutdown of streaming jobs, enable these tests - Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0)); - Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1)); - Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));*/ + Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0)); + Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1)); + Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));*/ LOG.info("Manipulating offsets"); @@ -342,18 +341,22 @@ private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, fin DataStream> source = env.addSource( new PersistentKafkaSource>(topicName, new Utils.TypeInformationSerializationSchema>(new Tuple2(1,1), env.getConfig()), cc) ) - //add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have - // to play this trick. The problem is that we have to wait until all checkpoints are confirmed - .map(new MapFunction, Tuple2>() { - @Override - public Tuple2 map(Tuple2 value) throws Exception { - Thread.sleep(150); - return value; - } - }).setParallelism(3); + //add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have + // to play this trick. The problem is that we have to wait until all checkpoints are confirmed + .map(new MapFunction, Tuple2>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2 map(Tuple2 value) throws Exception { + Thread.sleep(150); + return value; + } + }).setParallelism(3); // verify data DataStream validIndexes = source.flatMap(new RichFlatMapFunction, Integer>() { + private static final long serialVersionUID = 1L; + int[] values = new int[valuesCount]; int count = 0; @@ -391,33 +394,32 @@ public void flatMap(Tuple2 value, Collector out) thro private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from, final int to) throws Exception { LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName); - DataStream> stream = env.addSource(new RichParallelSourceFunction>() { private static final long serialVersionUID = 1L; - int cnt = from; - int partition; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - partition = getRuntimeContext().getIndexOfThisSubtask(); - - } + boolean running = true; @Override - public boolean reachedEnd() throws Exception { - return cnt > to; + public void run(Object checkpointLock, Collector> collector) throws Exception { + LOG.info("Starting source."); + int cnt = from; + int partition = getRuntimeContext().getIndexOfThisSubtask(); + while (running) { + LOG.info("Writing " + cnt + " to partition " + partition); + collector.collect(new Tuple2(getRuntimeContext().getIndexOfThisSubtask(), cnt)); + if (cnt == to) { + LOG.info("Writer reached end."); + return; + } + cnt++; + } } @Override - public Tuple2 next() throws Exception { - LOG.info("Writing " + cnt + " to partition " + partition); - Tuple2 result = new Tuple2(getRuntimeContext().getIndexOfThisSubtask(), cnt); - cnt++; - return result; + public void cancel() { + LOG.info("Source got cancel()"); + running = false; } }).setParallelism(3); - stream.addSink(new KafkaSink>(brokerConnectionStrings, topicName, new Utils.TypeInformationSerializationSchema>(new Tuple2(1, 1), env.getConfig()), @@ -428,6 +430,8 @@ public Tuple2 next() throws Exception { } private static class T2Partitioner implements SerializableKafkaPartitioner { + private static final long serialVersionUID = 1L; + @Override public int partition(Object key, int numPartitions) { if(numPartitions != 3) { @@ -451,6 +455,8 @@ public void regularKafkaSourceTest() throws Exception { DataStreamSource> consuming = env.addSource( new KafkaSource>(zookeeperConnectionString, topic, "myFlinkGroup", new Utils.TypeInformationSerializationSchema>(new Tuple2(1L, ""), env.getConfig()), 5000)); consuming.addSink(new SinkFunction>() { + private static final long serialVersionUID = 1L; + int elCnt = 0; int start = -1; BitSet validator = new BitSet(101); @@ -483,17 +489,25 @@ public void invoke(Tuple2 value) throws Exception { // add producing topology DataStream> stream = env.addSource(new SourceFunction>() { private static final long serialVersionUID = 1L; - int cnt = 0; + boolean running = true; @Override - public boolean reachedEnd() throws Exception { - return false; + public void run(Object checkpointLock, Collector> collector) throws Exception { + LOG.info("Starting source."); + int cnt = 0; + while (running) { + collector.collect(new Tuple2(1000L + cnt, "kafka-" + cnt++)); + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + } } @Override - public Tuple2 next() throws Exception { - Thread.sleep(100); - return new Tuple2(1000L + cnt, "kafka-" + cnt++); + public void cancel() { + LOG.info("Source got cancel()"); + running = false; } }); stream.addSink(new KafkaSink>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema>(new Tuple2(1L, ""), env.getConfig()))); @@ -519,6 +533,8 @@ public void tupleTestTopology() throws Exception { standardCC )); consuming.addSink(new RichSinkFunction>() { + private static final long serialVersionUID = 1L; + int elCnt = 0; int start = -1; BitSet validator = new BitSet(101); @@ -557,17 +573,27 @@ public void close() throws Exception { // add producing topology DataStream> stream = env.addSource(new SourceFunction>() { private static final long serialVersionUID = 1L; - int cnt = 0; + boolean running = true; @Override - public boolean reachedEnd() throws Exception { - return false; + public void run(Object checkpointLock, Collector> collector) throws Exception { + LOG.info("Starting source."); + int cnt = 0; + while (running) { + collector.collect(new Tuple2(1000L + cnt, "kafka-" + cnt++)); + LOG.info("Produced " + cnt); + + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + } } @Override - public Tuple2 next() throws Exception { - Thread.sleep(100); - return new Tuple2(1000L + cnt, "kafka-" + cnt++); + public void cancel() { + LOG.info("Source got cancel()"); + running = false; } }); stream.addSink(new KafkaSink>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema>(new Tuple2(1L, ""), env.getConfig()))); @@ -608,6 +634,8 @@ public void bigRecordTestTopology() throws Exception { new PersistentKafkaSource>(topic, serSchema, cc)); consuming.addSink(new SinkFunction>() { + private static final long serialVersionUID = 1L; + int elCnt = 0; @Override @@ -631,45 +659,47 @@ public void invoke(Tuple2 value) throws Exception { // add producing topology DataStream> stream = env.addSource(new RichSourceFunction>() { private static final long serialVersionUID = 1L; - boolean running = true; - long cnt; - transient Random rnd; + boolean running; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - cnt = 0; - rnd = new Random(1337); - + running = true; } @Override - public boolean reachedEnd() throws Exception { - return cnt > 10; - } - - @Override - public Tuple2 next() throws Exception { - Thread.sleep(100); - - if (cnt < 10) { - byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))]; - Tuple2 result = new Tuple2(cnt++, wl); + public void run(Object checkpointLock, Collector> collector) throws Exception { + LOG.info("Starting source."); + long cnt = 0; + Random rnd = new Random(1337); + while (running) { + // + byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))]; + collector.collect(new Tuple2(cnt++, wl)); LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length); - return result; - } else if (cnt == 10) { - Tuple2 result = new Tuple2(-1L, new byte[]{1}); - cnt++; - return result; - } else { - throw new RuntimeException("Source is exhausted."); + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + if(cnt == 10) { + LOG.info("Send end signal"); + // signal end + collector.collect(new Tuple2(-1L, new byte[]{1})); + running = false; + } } } + + @Override + public void cancel() { + LOG.info("Source got cancel()"); + running = false; + } }); stream.addSink(new KafkaSink>(brokerConnectionStrings, topic, - new Utils.TypeInformationSerializationSchema>(new Tuple2(0L, new byte[]{0}), env.getConfig())) + new Utils.TypeInformationSerializationSchema>(new Tuple2(0L, new byte[]{0}), env.getConfig())) ); tryExecute(env, "big topology test"); @@ -694,6 +724,8 @@ public void customPartitioningTestTopology() throws Exception { new Utils.TypeInformationSerializationSchema>(new Tuple2(1L, ""), env.getConfig()), standardCC)); consuming.addSink(new SinkFunction>() { + private static final long serialVersionUID = 1L; + int start = -1; BitSet validator = new BitSet(101); @@ -741,17 +773,25 @@ public void invoke(Tuple2 value) throws Exception { // add producing topology DataStream> stream = env.addSource(new SourceFunction>() { private static final long serialVersionUID = 1L; - int cnt = 0; + boolean running = true; @Override - public boolean reachedEnd() throws Exception { - return false; + public void run(Object checkpointLock, Collector> collector) throws Exception { + LOG.info("Starting source."); + int cnt = 0; + while (running) { + collector.collect(new Tuple2(1000L + cnt, "kafka-" + cnt++)); + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + } } @Override - public Tuple2 next() throws Exception { - Thread.sleep(100); - return new Tuple2(1000L + cnt, "kafka-" + cnt++); + public void cancel() { + LOG.info("Source got cancel()"); + running = false; } }); stream.addSink(new KafkaSink>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema>(new Tuple2(1L, ""), env.getConfig()), new CustomPartitioner())); @@ -765,6 +805,7 @@ public Tuple2 next() throws Exception { * This is for a topic with 3 partitions and Tuple2 */ private static class CustomPartitioner implements SerializableKafkaPartitioner { + private static final long serialVersionUID = 1L; @Override public int partition(Object key, int numPartitions) { @@ -794,6 +835,8 @@ public void simpleTestTopology() throws Exception { DataStreamSource consuming = env.addSource( new PersistentKafkaSource(topic, new JavaDefaultStringSchema(), standardCC)); consuming.addSink(new SinkFunction() { + private static final long serialVersionUID = 1L; + int elCnt = 0; int start = -1; BitSet validator = new BitSet(101); @@ -824,17 +867,24 @@ public void invoke(String value) throws Exception { DataStream stream = env.addSource(new SourceFunction() { private static final long serialVersionUID = 1L; boolean running = true; - int cnt = 0; @Override - public boolean reachedEnd() throws Exception { - return false; + public void run(Object checkpointLock, Collector collector) throws Exception { + LOG.info("Starting source."); + int cnt = 0; + while (running) { + collector.collect("kafka-" + cnt++); + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + } } @Override - public String next() throws Exception { - Thread.sleep(100); - return "kafka-" + cnt++; + public void cancel() { + LOG.info("Source got cancel()"); + running = false; } }); stream.addSink(new KafkaSink(brokerConnectionStrings, topic, new JavaDefaultStringSchema())); @@ -856,26 +906,35 @@ public void brokerFailureTest() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); DataStream stream = env.addSource(new SourceFunction() { + private static final long serialVersionUID = 1L; - private int cnt = 0; - - @Override - public boolean reachedEnd() throws Exception { - return cnt == 200; - } + boolean running = true; @Override - public String next() throws Exception { - String msg = "kafka-" + cnt++; - LOG.info("sending message = "+msg); + public void run(Object checkpointLock, Collector collector) throws Exception { + LOG.info("Starting source."); + int cnt = 0; + while (running) { + String msg = "kafka-" + cnt++; + collector.collect(msg); + LOG.info("sending message = "+msg); + + if ((cnt - 1) % 20 == 0) { + LOG.debug("Sending message #{}", cnt - 1); + } + if(cnt == 200) { + LOG.info("Stopping to produce after 200 msgs"); + break; + } - if ((cnt - 1) % 20 == 0) { - LOG.debug("Sending message #{}", cnt - 1); } - - return msg; } + @Override + public void cancel() { + LOG.info("Source got chancel()"); + running = false; + } }); stream.addSink(new KafkaSink(brokerConnectionStrings, topic, new JavaDefaultStringSchema())) .setParallelism(1); @@ -928,6 +987,8 @@ public void run() { consuming.setParallelism(1); consuming.addSink(new SinkFunction() { + private static final long serialVersionUID = 1L; + int elCnt = 0; int start = 0; int numOfMessagesToBeCorrect = 100; @@ -987,13 +1048,13 @@ public static void tryExecute(StreamExecutionEnvironment see, String name) throw while (!(t instanceof SuccessException)) { if(t == null) { LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + StringUtils.stringifyException(good)); + Assert.fail("Test failed with: " + good.getMessage()); } t = t.getCause(); if (limit++ == 20) { LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + StringUtils.stringifyException(good)); + Assert.fail("Test failed with: " + good.getMessage()); } } } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index d706b8cbabd40..93812c2d1ea2f 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -27,6 +27,7 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; +import org.apache.flink.util.Collector; public class RMQSource extends ConnectorSource { private static final long serialVersionUID = 1L; @@ -40,7 +41,7 @@ public class RMQSource extends ConnectorSource { private transient QueueingConsumer consumer; private transient QueueingConsumer.Delivery delivery; - OUT out; + private transient volatile boolean running; public RMQSource(String HOST_NAME, String QUEUE_NAME, DeserializationSchema deserializationSchema) { @@ -70,6 +71,7 @@ private void initializeConnection() { @Override public void open(Configuration config) throws Exception { initializeConnection(); + running = true; } @Override @@ -84,47 +86,21 @@ public void close() throws Exception { } @Override - public boolean reachedEnd() throws Exception { - if (out != null) { - return true; - } - try { + public void run(Object checkpointLock, Collector out) throws Exception { + while (running) { delivery = consumer.nextDelivery(); - } catch (Exception e) { - throw new RuntimeException("Error while reading message from RMQ source from " + QUEUE_NAME - + " at " + HOST_NAME, e); - } - out = schema.deserialize(delivery.getBody()); - if (schema.isEndOfStream(out)) { - out = null; - return false; + OUT result = schema.deserialize(delivery.getBody()); + if (schema.isEndOfStream(result)) { + break; + } + + out.collect(result); } - return true; } @Override - public OUT next() throws Exception { - if (out != null) { - OUT result = out; - out = null; - return result; - } - - try { - delivery = consumer.nextDelivery(); - } catch (Exception e) { - throw new RuntimeException("Error while reading message from RMQ source from " + QUEUE_NAME - + " at " + HOST_NAME, e); - } - - out = schema.deserialize(delivery.getBody()); - if (schema.isEndOfStream(out)) { - throw new RuntimeException("RMQ source is at end for " + QUEUE_NAME + " at " + HOST_NAME); - } - OUT result = out; - out = null; - return result; + public void cancel() { + running = false; } - } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java index 0b47985d18136..a1a6d9bf0a9c0 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,8 @@ /** * Implementation of {@link SourceFunction} specialized to emit tweets from - * Twitter. + * Twitter. This is not a parallel source because the Twitter API only allows + * two concurrent connections. */ public class TwitterSource extends RichSourceFunction { @@ -56,6 +58,8 @@ public class TwitterSource extends RichSourceFunction { private int maxNumberOfTweets; private int currentNumberOfTweets; + private transient volatile boolean isRunning; + /** * Create {@link TwitterSource} for streaming * @@ -86,6 +90,7 @@ public TwitterSource(String authPath, int numberOfTweets) { public void open(Configuration parameters) throws Exception { initializeConnection(); currentNumberOfTweets = 0; + isRunning = true; } /** @@ -148,13 +153,16 @@ protected void initializeClient(DefaultStreamingEndpoint endpoint, Authenticatio client.connect(); } - private void closeConnection() { + @Override + public void close() { if (LOG.isInfoEnabled()) { LOG.info("Initiating connection close"); } - client.stop(); + if (client != null) { + client.stop(); + } if (LOG.isInfoEnabled()) { LOG.info("Connection closed successfully"); @@ -201,30 +209,26 @@ public void setWaitSec(int waitSec) { } @Override - public boolean reachedEnd() throws Exception { - if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets){ - closeConnection(); - return true; - } + public void run(Object checkpointLock, Collector out) throws Exception { + while (isRunning) { + if (client.isDone()) { + if (LOG.isErrorEnabled()) { + LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent() + .getMessage()); + } + break; + } - if (client.isDone()) { - if (LOG.isErrorEnabled()) { - LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent() - .getMessage()); + out.collect(queue.take()); + + if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets) { + break; } - return true; } - - return false; } @Override - public String next() throws Exception { - if (reachedEnd()) { - throw new RuntimeException("Twitter stream end reached."); - } - - String msg = queue.take(); - return msg; + public void cancel() { + isRunning = false; } } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java index e500fef0ffc11..a80c32aca67af 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java @@ -1,19 +1,19 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.flink.streaming.connectors.twitter; @@ -68,7 +68,7 @@ public void flatMap(String value, Collector { @@ -76,7 +76,7 @@ public static void main(String[] args) throws Exception { .flatMap(new SelectLanguageFlatMap()) .map(new MapFunction>() { private static final long serialVersionUID = 1L; - + @Override public Tuple2 map(String value) throws Exception { return new Tuple2(value, 1); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java index f74d81cb6f87a..ecbcee594adca 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java @@ -21,20 +21,18 @@ import java.net.URI; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FileMonitoringFunction extends RichSourceFunction> { +public class FileMonitoringFunction implements SourceFunction> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class); @@ -51,28 +49,50 @@ public enum WatchType { private long interval; private WatchType watchType; - private FileSystem fileSystem; private Map offsetOfFiles; private Map modificationTimes; - private Queue> pendingFiles; + private volatile boolean isRunning; public FileMonitoringFunction(String path, long interval, WatchType watchType) { this.path = path; this.interval = interval; this.watchType = watchType; + this.modificationTimes = new HashMap(); + this.offsetOfFiles = new HashMap(); } @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.modificationTimes = new HashMap(); - this.offsetOfFiles = new HashMap(); - this.pendingFiles = new LinkedList>(); - fileSystem = FileSystem.get(new URI(path)); + public void run(Object checkpointLock, Collector> collector) throws Exception { + isRunning = true; + FileSystem fileSystem = FileSystem.get(new URI(path)); + + while (isRunning) { + List files = listNewFiles(fileSystem); + for (String filePath : files) { + if (watchType == WatchType.ONLY_NEW_FILES + || watchType == WatchType.REPROCESS_WITH_APPENDED) { + collector.collect(new Tuple3(filePath, 0L, -1L)); + offsetOfFiles.put(filePath, -1L); + } else if (watchType == WatchType.PROCESS_ONLY_APPENDED) { + long offset = 0; + long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen(); + if (offsetOfFiles.containsKey(filePath)) { + offset = offsetOfFiles.get(filePath); + } + + collector.collect(new Tuple3(filePath, offset, fileSize)); + offsetOfFiles.put(filePath, fileSize); + + LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize); + } + } + + Thread.sleep(interval); + } } - private List listNewFiles() throws IOException { + private List listNewFiles(FileSystem fileSystem) throws IOException { List files = new ArrayList(); FileStatus[] statuses = fileSystem.listStatus(new Path(path)); @@ -105,44 +125,8 @@ private boolean isFiltered(String fileName, long modificationTime) { } } - - @Override - public boolean reachedEnd() throws Exception { - return false; - } - @Override - public Tuple3 next() throws Exception { - if (pendingFiles.size() > 0) { - return pendingFiles.poll(); - } else { - while (true) { - List files = listNewFiles(); - for (String filePath : files) { - if (watchType == WatchType.ONLY_NEW_FILES - || watchType == WatchType.REPROCESS_WITH_APPENDED) { - pendingFiles.add(new Tuple3(filePath, 0L, -1L)); - offsetOfFiles.put(filePath, -1L); - } else if (watchType == WatchType.PROCESS_ONLY_APPENDED) { - long offset = 0; - long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen(); - if (offsetOfFiles.containsKey(filePath)) { - offset = offsetOfFiles.get(filePath); - } - - pendingFiles.add(new Tuple3(filePath, offset, fileSize)); - offsetOfFiles.put(filePath, fileSize); - - LOG.info("File added to queue: {}, {}, {}", filePath, offset, fileSize); - } - } - if (files.size() > 0) { - break; - } - Thread.sleep(interval); - } - } - - return pendingFiles.poll(); + public void cancel() { + isRunning = false; } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java index c6f44210a577c..19ee7e144c488 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java @@ -24,6 +24,7 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.apache.flink.util.Collector; import java.util.Iterator; import java.util.NoSuchElementException; @@ -34,11 +35,12 @@ public class FileSourceFunction extends RichParallelSourceFunction { private TypeInformation typeInfo; private transient TypeSerializer serializer; - private InputSplitProvider provider; private InputFormat format; - private Iterator splitIterator; - private transient OUT nextElement; + private transient InputSplitProvider provider; + private transient Iterator splitIterator; + + private volatile boolean isRunning; @SuppressWarnings("unchecked") public FileSourceFunction(InputFormat format, TypeInformation typeInfo) { @@ -58,7 +60,7 @@ public void open(Configuration parameters) throws Exception { if (splitIterator.hasNext()) { format.open(splitIterator.next()); } - + isRunning = true; } @Override @@ -115,30 +117,25 @@ public void remove() { } @Override - public boolean reachedEnd() throws Exception { - if (nextElement != null) { - return false; - } - nextElement = serializer.createInstance(); - nextElement = format.nextRecord(nextElement); - if (nextElement == null && splitIterator.hasNext()) { - format.open(splitIterator.next()); - return reachedEnd(); - } else if (nextElement == null) { - return true; + public void run(Object checkpointLock, Collector out) throws Exception { + isRunning = true; + + while (isRunning) { + OUT nextElement = serializer.createInstance(); + nextElement = format.nextRecord(nextElement); + if (nextElement == null && splitIterator.hasNext()) { + format.open(splitIterator.next()); + continue; + } else if (nextElement == null) { + break; + } + out.collect(nextElement); } - return false; } @Override - public OUT next() throws Exception { - if (reachedEnd()) { - throw new RuntimeException("End of FileSource reached."); - } - - OUT result = nextElement; - nextElement = null; - return result; + public void cancel() { + isRunning = false; } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index 6654361179a8c..0b323bca39c0f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -21,15 +21,15 @@ import java.util.Collection; import java.util.Iterator; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; -public class FromElementsFunction extends RichSourceFunction { +public class FromElementsFunction implements SourceFunction { private static final long serialVersionUID = 1L; - private transient Iterator iterator; - private Iterable iterable; + private volatile boolean isRunning; + public FromElementsFunction(T... elements) { this.iterable = Arrays.asList(elements); } @@ -43,19 +43,17 @@ public FromElementsFunction(Iterable elements) { } @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - iterator = iterable.iterator(); - } + public void run(Object checkpointLock, Collector out) throws Exception { + isRunning = true; + Iterator it = iterable.iterator(); - @Override - public boolean reachedEnd() throws Exception { - return !iterator.hasNext(); + while (isRunning && it.hasNext()) { + out.collect(it.next()); + } } @Override - public T next() throws Exception { - return iterator.next(); + public void cancel() { + isRunning = false; } - } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java index 125b88b50f27b..4ee1334da46ad 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.api.functions.source; +import org.apache.flink.util.Collector; + import java.util.Iterator; public class FromIteratorFunction implements SourceFunction { @@ -25,17 +27,22 @@ public class FromIteratorFunction implements SourceFunction { Iterator iterator; + private volatile boolean isRunning; + public FromIteratorFunction(Iterator iterator) { this.iterator = iterator; } @Override - public boolean reachedEnd() throws Exception { - return !iterator.hasNext(); + public void run(Object checkpointLock, Collector out) throws Exception { + isRunning = true; + while (isRunning && iterator.hasNext()) { + out.collect(iterator.next()); + } } @Override - public T next() throws Exception { - return iterator.next(); + public void cancel() { + isRunning = false; } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java index fd86858ceb2a1..97f5c06a37a20 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; import org.apache.flink.util.SplittableIterator; import java.util.Iterator; @@ -26,8 +27,11 @@ public class FromSplittableIteratorFunction extends RichParallelSourceFunctio private static final long serialVersionUID = 1L; - SplittableIterator fullIterator; - Iterator iterator; + private SplittableIterator fullIterator; + + private transient Iterator iterator; + + private volatile boolean isRunning; public FromSplittableIteratorFunction(SplittableIterator iterator) { this.fullIterator = iterator; @@ -38,15 +42,20 @@ public void open(Configuration parameters) throws Exception { int numberOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks(); int indexofThisSubTask = getRuntimeContext().getIndexOfThisSubtask(); iterator = fullIterator.split(numberOfSubTasks)[indexofThisSubTask]; + isRunning = true; } @Override - public boolean reachedEnd() throws Exception { - return !iterator.hasNext(); + public void run(Object checkpointLock, Collector out) throws Exception { + isRunning = true; + + while (isRunning && iterator.hasNext()) { + out.collect(iterator.next()); + } } @Override - public T next() throws Exception { - return iterator.next(); + public void cancel() { + isRunning = false; } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 6a1f93046b6e5..8e5c3b31b295b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -26,6 +26,7 @@ import java.net.SocketException; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,11 +45,7 @@ public class SocketTextStreamFunction extends RichSourceFunction { private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; - private transient StringBuffer buffer; - private transient BufferedReader reader; - - private boolean socketClosed; - private transient String nextElement; + private volatile boolean isRunning = false; public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) { this.hostname = hostname; @@ -63,102 +60,89 @@ public void open(Configuration parameters) throws Exception { super.open(parameters); socket = new Socket(); socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); - buffer = new StringBuffer(); - reader = new BufferedReader(new InputStreamReader( - socket.getInputStream())); - socketClosed = false; } @Override - public void close() throws Exception { - super.close(); - if (reader != null) { - reader.close(); - } - if (socket != null && !socket.isClosed()) { - try { - socket.close(); - } catch (IOException e) { - if (LOG.isErrorEnabled()) { - LOG.error("Could not close open socket"); - } - } - } - + public void run(Object checkpointLock, Collector collector) throws Exception { + streamFromSocket(collector, socket); } - public String blockingRead(Socket socket) throws Exception { - - while (true) { - int data; - try { - data = reader.read(); - } catch (SocketException e) { - socketClosed = true; - break; - } + public void streamFromSocket(Collector collector, Socket socket) throws Exception { + isRunning = true; + try { + StringBuffer buffer = new StringBuffer(); + BufferedReader reader = new BufferedReader(new InputStreamReader( + socket.getInputStream())); + + while (isRunning) { + int data; + try { + data = reader.read(); + } catch (SocketException e) { + if (!isRunning) { + break; + } else { + throw e; + } + } - if (data == -1) { - socket.close(); - long retry = 0; - boolean success = false; - while (retry < maxRetry && !success) { - if (!retryForever) { - retry++; + if (data == -1) { + socket.close(); + long retry = 0; + boolean success = false; + while (retry < maxRetry && !success) { + if (!retryForever) { + retry++; + } + LOG.warn("Lost connection to server socket. Retrying in " + + (CONNECTION_RETRY_SLEEP / 1000) + " seconds..."); + try { + socket = new Socket(); + socket.connect(new InetSocketAddress(hostname, port), + CONNECTION_TIMEOUT_TIME); + success = true; + } catch (ConnectException ce) { + Thread.sleep(CONNECTION_RETRY_SLEEP); + } } - LOG.warn("Lost connection to server socket. Retrying in " - + (CONNECTION_RETRY_SLEEP / 1000) + " seconds..."); - try { - socket = new Socket(); - socket.connect(new InetSocketAddress(hostname, port), - CONNECTION_TIMEOUT_TIME); - success = true; - } catch (ConnectException ce) { - Thread.sleep(CONNECTION_RETRY_SLEEP); + + if (success) { + LOG.info("Server socket is reconnected."); + } else { + LOG.error("Could not reconnect to server socket."); + break; } + reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + continue; } - if (success) { - LOG.info("Server socket is reconnected."); - } else { - LOG.error("Could not reconnect to server socket."); - break; + if (data == delimiter) { + collector.collect(buffer.toString()); + buffer = new StringBuffer(); + } else if (data != '\r') { // ignore carriage return + buffer.append((char) data); } - reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); - continue; } - if (data == delimiter) { - String result = buffer.toString(); - buffer = new StringBuffer(); - return result; - } else if (data != '\r') { // ignore carriage return - buffer.append((char) data); + if (buffer.length() > 0) { + collector.collect(buffer.toString()); } + } finally { + socket.close(); } - - return null; } - @Override - public boolean reachedEnd() throws Exception { - if (socketClosed) { - return false; - } - - nextElement = blockingRead(socket); - - return nextElement == null; - } - - @Override - public String next() throws Exception { - if (nextElement == null) { - reachedEnd(); + public void cancel() { + isRunning = false; + if (socket != null && !socket.isClosed()) { + try { + socket.close(); + } catch (IOException e) { + if (LOG.isErrorEnabled()) { + LOG.error("Could not close open socket"); + } + } } - - return nextElement; } - } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index e6ffdc1156b01..8c349e938ec67 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -19,66 +19,81 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.util.Collector; import java.io.Serializable; /** * Base interface for all stream data sources in Flink. The contract of a stream source - * is similar to an iterator - it is consumed as in the following pseudo code: + * is the following: When the source should start emitting elements the {@link #run} method + * is called with a {@link org.apache.flink.util.Collector} that can be used for emitting elements. + * The run method can run for as long as necessary. The source must, however, react to an + * invocation of {@link #cancel} by breaking out of its main loop. * - *
{@code
- * StreamSource source = ...;
- * Collector out = ...;
- * while (!source.reachedEnd()) {
- *   out.collect(source.next());
- * }
+ * Note about checkpointed sources
+ * 

+ * Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} + * interface must ensure that state checkpointing, updating of internal state and emission of + * elements are not done concurrently. This is achieved by using the provided checkpointing lock + * object to protect update of state and emission of elements in a synchronized block. + *

+ * + *

+ * This is the basic pattern one should follow when implementing a (checkpointed) source: + *

+ * + *
+ * {@code
+ *  public class ExampleSource implements SourceFunction, Checkpointed {
+ *      private long count = 0L;
+ *      private volatile boolean isRunning;
+ *
+ *      @Override
+ *      public void run(Object checkpointLock, Collector out) {
+ *          isRunning = true;
+ *          while (isRunning && count < 1000) {
+ *              synchronized (checkpointLock) {
+ *                  out.collect(count);
+ *                  count++;
+ *              }
+ *          }
+ *      }
+ *
+ *      @Override
+ *      public void cancel() {
+ *          isRunning = false;
+ *      }
+ *
+ *      @Override
+ *      public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; }
+ *
+ *      @Override
+ *      public void restoreState(Long state) { this.count = state; }
  * }
  * 
- * - * Note about blocking behavior - *

This implementations of the methods in the stream sources must have certain guarantees about - * blocking behavior. One of the two characteristics must be fulfilled.

- *
    - *
  • The methods must react to thread interrupt calls and break out of blocking calls with - * an {@link InterruptedException}.
  • - *
  • The method may ignore interrupt calls and/or swallow InterruptedExceptions, if it is guaranteed - * that the method returns quasi immediately irrespectively of the input. This is true for example - * for file streams, where the call is guaranteed to return after a very short I/O delay in - * the order of milliseconds.
  • - *
- * - * @param The type of the records produced by this source. + * + * @param The type of the elements produced by this source. */ public interface SourceFunction extends Function, Serializable { - + /** - * Checks whether the stream has reached its end. + * Starts the source. You can use the {@link org.apache.flink.util.Collector} parameter to emit + * elements. Sources that implement + * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} must lock on the + * checkpoint lock (using a synchronized block) before updating internal state and/or emitting + * elements. Also, the update of state and emission of elements must happen in the same + * synchronized block. * - *

This method must obey the contract about blocking behavior declared in the - * description of this class.

- * - * @return True, if the end of the stream has been reached, false if more data is available. - * - * @throws InterruptedException The calling thread may be interrupted to pull the function out of this - * method during checkpoints. - * @throws Exception Any other exception that is thrown causes the source to fail and results in failure of - * the streaming program, or triggers recovery, depending on the program setup. + * @param checkpointLock The object to synchronize on when updating state and emitting elements. + * @param out The collector to use for emitting elements */ - boolean reachedEnd() throws Exception; - + void run(final Object checkpointLock, Collector out) throws Exception; /** - * Produces the next record. - * - *

This method must obey the contract about blocking behavior declared in the - * description of this class.

- * - * @return The next record produced by this stream source. - * - * @throws InterruptedException The calling thread may be interrupted to pull the function out of this - * method during checkpoints. - * @throws Exception Any other exception that is thrown causes the source to fail and results in failure of - * the streaming program, or triggers recovery, depending on the program setup. + * Cancels the source. Most sources will have a while loop inside the + * {@link #run} method. You need to ensure that the source will break out of this loop. This + * can be achieved by having a volatile field "isRunning" that is checked in the loop and that + * is set to false in this method. */ - T next() throws Exception; + void cancel(); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index b55e5d67baad6..7bb7780e8c760 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -41,7 +41,7 @@ public abstract class AbstractStreamOperator implements StreamOperator protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; @Override - public final void setup(Output output, RuntimeContext runtimeContext) { + public void setup(Output output, RuntimeContext runtimeContext) { this.output = output; this.executionConfig = runtimeContext.getExecutionConfig(); this.runtimeContext = runtimeContext; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 09d1ef6e1c64b..852bfde610120 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter; @@ -42,10 +43,16 @@ public AbstractUdfStreamOperator(F userFunction) { this.userFunction = userFunction; } + @Override + public final void setup(Output output, RuntimeContext runtimeContext) { + super.setup(output, runtimeContext); + FunctionUtils.setFunctionRuntimeContext(userFunction, runtimeContext); + } + + @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - FunctionUtils.setFunctionRuntimeContext(userFunction, runtimeContext); FunctionUtils.openFunction(userFunction, parameters); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 9cdfb01a15fe1..e63349a84011b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -18,7 +18,11 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; +/** + * {@link StreamOperator} for streaming sources. + */ public class StreamSource extends AbstractUdfStreamOperator> implements StreamOperator { private static final long serialVersionUID = 1L; @@ -29,19 +33,11 @@ public StreamSource(SourceFunction sourceFunction) { this.chainingStrategy = ChainingStrategy.HEAD; } - public void run() throws Exception { - while (true) { - - synchronized (userFunction) { - if (userFunction.reachedEnd()) { - break; - } - - OUT result = userFunction.next(); + public void run(Object lockingObject, Collector collector) throws Exception { + userFunction.run(lockingObject, collector); + } - output.collect(result); - } - Thread.yield(); - } + public void cancel() { + userFunction.cancel(); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 95cbc8797ac0a..186fb3fbbbd93 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -22,6 +22,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Task for executing streaming sources. + * + * One important aspect of this is that the checkpointing and the emission of elements must never + * occur at the same time. The execution must be serial. This is achieved by having the contract + * with the StreamFunction that it must only modify its state or emit elements in + * a synchronized block that locks on the checkpointLock Object. Also, the modification of the state + * and the emission of elements must happen in the same block of code that is protected by the + * synchronized block. + * + * @param Type of the output elements of this source. + */ public class SourceStreamTask extends StreamTask> { private static final Logger LOG = LoggerFactory.getLogger(SourceStreamTask.class); @@ -40,7 +52,7 @@ public void invoke() throws Exception { openOperator(); operatorOpen = true; - streamOperator.run(); + streamOperator.run(checkpointLock, outputHandler.getOutput()); closeOperator(); operatorOpen = false; @@ -71,4 +83,10 @@ public void invoke() throws Exception { } } + + @Override + public void cancel() { + streamOperator.cancel(); + super.cancel(); + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index d458dacf986f3..bb641d9ce8d5f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -50,7 +50,7 @@ public abstract class StreamTask> extends Abs private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); - private final Object checkpointLock = new Object(); + protected final Object checkpointLock = new Object(); protected StreamConfig configuration; @@ -191,9 +191,6 @@ public EventListener getSuperstepListener() { // Checkpoint and Restore // ------------------------------------------------------------------------ - /** - * Re-injects the user states into the map. Also set the state on the functions. - */ @Override public void setInitialState(StateHandle stateHandle) throws Exception { // here, we later resolve the state handle into the actual state by @@ -230,13 +227,9 @@ public void setInitialState(StateHandle stateHandle) throws Except } } - /** - * This method is either called directly by the checkpoint coordinator, or called - * when all incoming channels have reported a barrier - */ @Override public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception { - + synchronized (checkpointLock) { if (isRunning) { try { @@ -293,6 +286,7 @@ public void triggerCheckpoint(long checkpointId, long timestamp) throws Exceptio } } } + } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java index 084973f4f32de..fef8a3113628c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.util.Collector; import org.junit.Test; @SuppressWarnings("serial") @@ -46,13 +47,11 @@ public void test() throws Exception { private static class TestSource extends RichParallelSourceFunction { @Override - public boolean reachedEnd() throws Exception { - return true; + public void run(Object checkpointLock, Collector out) throws Exception { } @Override - public Integer next() throws Exception { - return null; + public void cancel() { } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java index ca6057c801b0d..1dbbc00fae65b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java @@ -115,18 +115,17 @@ public String map(Long value) throws Exception { } private class TestSource implements SourceFunction { - + private static final long serialVersionUID = 1L; @Override - public boolean reachedEnd() throws Exception { - return false; + public void run(Object checkpointLock, Collector out) throws Exception { + } @Override - public T next() throws Exception { - return null; - } + public void cancel() { + } } private class TestMap implements MapFunction { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java index 9d4efdc4b349f..28586581b1c54 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java @@ -275,7 +275,7 @@ public void complexIntegrationTest4() throws Exception { env.addSource(new RectangleSource()) .global() .map(new RectangleMapFunction()) - .window(Delta.of(0.0, new MyDelta(), new Tuple2(new RectangleClass(100, 100), 0))) + .window(Delta.of(0.0, new MyDelta(), new Tuple2(new Rectangle(100, 100), 0))) .mapWindow(new MyWindowMapFunction()) .flatten() .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE); @@ -283,11 +283,11 @@ public void complexIntegrationTest4() throws Exception { env.execute(); } - private static class MyDelta implements DeltaFunction> { + private static class MyDelta implements DeltaFunction> { private static final long serialVersionUID = 1L; @Override - public double getDelta(Tuple2 oldDataPoint, Tuple2 oldDataPoint, Tuple2 newDataPoint) { return (newDataPoint.f0.b - newDataPoint.f0.a) - (oldDataPoint.f0.b - oldDataPoint.f0.a); } @@ -484,33 +484,35 @@ private static class PojoSource implements SourceFunction { long cnt = 0; @Override - public boolean reachedEnd() throws Exception { - return cnt >= 20; + public void run(Object checkpointLock, + Collector out) throws Exception { + for (int i = 0; i < 20; i++) { + OuterPojo result = new OuterPojo(new InnerPojo(cnt / 2, "water_melon-b"), 2L); + out.collect(result); + } } @Override - public OuterPojo next() throws Exception { - OuterPojo result = new OuterPojo(new InnerPojo(cnt / 2, "water_melon-b"), 2L); - cnt++; - return result; + public void cancel() { + } } private static class TupleSource implements SourceFunction>> { private static final long serialVersionUID = 1L; - int cnt = 0; - @Override - public boolean reachedEnd() throws Exception { - return cnt >= 20; + public void run(Object checkpointLock, + Collector>> out) throws Exception { + for (int i = 0; i < 20; i++) { + Tuple2> result = new Tuple2>(1L, new Tuple2("a", 1L)); + out.collect(result); + } } @Override - public Tuple2> next() throws Exception { - Tuple2> result = new Tuple2>(1L, new Tuple2("a", 1L)); - cnt++; - return result; + public void cancel() { + } } @@ -611,46 +613,44 @@ public void flatMap(Long value, Collector out) throws Exception { } } - private static class RectangleSource extends RichSourceFunction { + private static class RectangleSource extends RichSourceFunction { private static final long serialVersionUID = 1L; - private transient RectangleClass rectangle; - private transient int cnt; + private transient Rectangle rectangle; public void open(Configuration parameters) throws Exception { - rectangle = new RectangleClass(100, 100); - cnt = 0; + rectangle = new Rectangle(100, 100); } @Override - public boolean reachedEnd() throws Exception { - return cnt >= 100; + public void run(Object checkpointLock, + Collector out) throws Exception { + for (int i = 0; i < 100; i++) { + out.collect(rectangle); + rectangle = rectangle.next(); + } } @Override - public RectangleClass next() throws Exception { - RectangleClass result = rectangle; - cnt++; - rectangle = rectangle.next(); - return result; + public void cancel() { } } - private static class RectangleMapFunction implements MapFunction> { + private static class RectangleMapFunction implements MapFunction> { private static final long serialVersionUID = 1L; private int counter = 0; @Override - public Tuple2 map(RectangleClass value) throws Exception { - return new Tuple2(value, counter++); + public Tuple2 map(Rectangle value) throws Exception { + return new Tuple2(value, counter++); } } - private static class MyWindowMapFunction implements WindowMapFunction, - Tuple2> { + private static class MyWindowMapFunction implements WindowMapFunction, + Tuple2> { private static final long serialVersionUID = 1L; @Override - public void mapWindow(Iterable> values, Collector> values, Collector> out) throws Exception { out.collect(values.iterator().next()); } @@ -769,23 +769,21 @@ public String toString() { } } - public static class RectangleClass { - - private static final long serialVersionUID = 1L; + public static class Rectangle { public int a; public int b; //default constructor to qualify as Flink POJO - public RectangleClass() {} + public Rectangle() {} - public RectangleClass(int a, int b) { + public Rectangle(int a, int b) { this.a = a; this.b = b; } - public RectangleClass next() { - return new RectangleClass(a + (b % 11), b + (a % 9)); + public Rectangle next() { + return new Rectangle(a + (b % 11), b + (a % 9)); } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java index d5b2a4cf8d1f1..ec97984548c47 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java @@ -159,22 +159,23 @@ public long getTimestamp(Integer value) { .getDiscretizedStream().addSink(new TestSink12()); DataStream source2 = env.addSource(new ParallelSourceFunction() { - - private int i = 1; + private static final long serialVersionUID = 1L; @Override - public boolean reachedEnd() throws Exception { - return i > 10; + public void run(Object checkpointLock, Collector out) throws Exception { + for (int i = 1; i <= 10; i++) { + out.collect(i); + } } @Override - public Integer next() throws Exception { - return i++; + public void cancel() { } - }); DataStream source3 = env.addSource(new RichParallelSourceFunction() { + private static final long serialVersionUID = 1L; + private int i = 1; @Override @@ -184,17 +185,16 @@ public void open(Configuration parameters) throws Exception { } @Override - public boolean reachedEnd() throws Exception { - return i > 11; + public void cancel() { } @Override - public Integer next() throws Exception { - int result = i; - i += 2; - return result; - } + public void run(Object checkpointLock, Collector out) throws Exception { + for (;i < 11; i += 2) { + out.collect(i); + } + } }); source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9()); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java index 90cb7b311e975..55f4addb287d8 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.util.Collector; import org.junit.Test; public class StreamVertexTest { @@ -44,22 +45,23 @@ public class StreamVertexTest { private static Map data = new HashMap(); public static class MySource implements SourceFunction> { + private static final long serialVersionUID = 1L; + private Tuple1 tuple = new Tuple1(0); private int i = 0; @Override - public boolean reachedEnd() throws Exception { - return i >= 10; + public void run(Object checkpointLock, Collector> out) throws Exception { + for (int i = 0; i < 10; i++) { + tuple.f0 = i; + out.collect(tuple); + } } @Override - public Tuple1 next() throws Exception { - tuple.f0 = i; - i++; - return tuple; + public void cancel() { } - } public static class MyTask extends RichMapFunction, Tuple2> { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java new file mode 100644 index 0000000000000..ef440530716a5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.tasks; + + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.util.Collector; +import org.apache.flink.util.StringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({Task.class, ResultPartitionWriter.class}) +public class SourceStreamTaskTest extends StreamTaskTestBase { + + private static final int MEMORY_MANAGER_SIZE = 1024 * 1024; + + private static final int NETWORK_BUFFER_SIZE = 1024; + + /** + * This test ensures that the SourceStreamTask properly serializes checkpointing + * and element emission. This also verifies that there are no concurrent invocations + * of the checkpoint method on the source operator. + * + * The source emits elements and performs checkpoints. We have several checkpointer threads + * that fire checkpoint requests at the source task. + * + * If element emission and checkpointing are not in series the count of elements at the + * beginning of a checkpoint and at the end of a checkpoint are not the same because the + * source kept emitting elements while the checkpoint was ongoing. + */ + @Test + public void testDataSourceTask() throws Exception { + final int NUM_ELEMENTS = 100; + final int NUM_CHECKPOINTS = 100; + final int NUM_CHECKPOINTERS = 1; + final int CHECKPOINT_INTERVAL = 5; // in ms + final int SOURCE_CHECKPOINT_DELAY = 1000; // how many random values we sum up in storeCheckpoint + final int SOURCE_READ_DELAY = 1; // in ms + + List> outList = new ArrayList>(); + + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); + + StreamSource> sourceOperator = new StreamSource>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY)); + + final SourceStreamTask> sourceTask = new SourceStreamTask>(); + + TupleTypeInfo> typeInfo = new TupleTypeInfo>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); + TypeSerializer> serializer = typeInfo.createSerializer(new ExecutionConfig()); + StreamRecordSerializer> streamSerializer = new StreamRecordSerializer>(typeInfo, new ExecutionConfig()); + + super.addOutput(outList, serializer); + + StreamConfig streamConfig = super.getStreamConfig(); + + streamConfig.setStreamOperator(sourceOperator); + streamConfig.setChainStart(); + streamConfig.setOutputSelectorWrapper(new BroadcastOutputSelectorWrapper()); + streamConfig.setNumberOfOutputs(1); + + List outEdgesInOrder = new LinkedList(); + StreamNode sourceVertex = new StreamNode(null, 0, sourceOperator, "source", new LinkedList>(), SourceStreamTask.class); + StreamNode targetVertexDummy = new StreamNode(null, 1, sourceOperator, "target dummy", new LinkedList>(), SourceStreamTask.class); + + outEdgesInOrder.add(new StreamEdge(sourceVertex, targetVertexDummy, 0, new LinkedList(), new BroadcastPartitioner())); + streamConfig.setOutEdgesInOrder(outEdgesInOrder); + streamConfig.setNonChainedOutputs(outEdgesInOrder); + streamConfig.setTypeSerializerOut1(streamSerializer); + streamConfig.setVertexID(0); + + super.registerTask(sourceTask); + + ExecutorService executor = Executors.newFixedThreadPool(10); + Future[] checkpointerResults = new Future[NUM_CHECKPOINTERS]; + for (int i = 0; i < NUM_CHECKPOINTERS; i++) { + checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask)); + } + + + try { + sourceTask.invoke(); + } catch (Exception e) { + System.err.println(StringUtils.stringifyException(e)); + Assert.fail("Invoke method caused exception."); + } + + // Get the result from the checkpointers, if these threw an exception it + // will be rethrown here + for (int i = 0; i < NUM_CHECKPOINTERS; i++) { + if (!checkpointerResults[i].isDone()) { + checkpointerResults[i].cancel(true); + } + if (!checkpointerResults[i].isCancelled()) { + checkpointerResults[i].get(); + } + } + + Assert.assertEquals(NUM_ELEMENTS, outList.size()); + } + + private static class MockSource implements SourceFunction>, Checkpointed { + + private static final long serialVersionUID = 1; + + private int maxElements; + private int checkpointDelay; + private int readDelay; + + private volatile int count; + private volatile long lastCheckpointId = -1; + + private Semaphore semaphore; + + private volatile boolean isRunning = true; + + public MockSource(int maxElements, int checkpointDelay, int readDelay) { + this.maxElements = maxElements; + this.checkpointDelay = checkpointDelay; + this.readDelay = readDelay; + this.count = 0; + semaphore = new Semaphore(1); + } + + @Override + public void run(final Object lockObject, Collector> out) { + while (isRunning && count < maxElements) { + // simulate some work + try { + Thread.sleep(readDelay); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + synchronized (lockObject) { + out.collect(new Tuple2(lastCheckpointId, count)); + count++; + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + if (!semaphore.tryAcquire()) { + Assert.fail("Concurrent invocation of snapshotState."); + } + int startCount = count; + lastCheckpointId = checkpointId; + + long sum = 0; + for (int i = 0; i < checkpointDelay; i++) { + sum += new Random().nextLong(); + } + + if (startCount != count) { + semaphore.release(); + // This means that next() was invoked while the snapshot was ongoing + Assert.fail("Count is different at start end end of snapshot."); + } + semaphore.release(); + return sum; + } + + @Override + public void restoreState(Serializable state) { + + } + } + + /** + * This calls triggerInterrupt on the given task with the given interval. + */ + private static class Checkpointer implements Callable { + private final int numCheckpoints; + private final int checkpointInterval; + private final AtomicLong checkpointId; + private final StreamTask, ?> sourceTask; + + public Checkpointer(int numCheckpoints, int checkpointInterval, StreamTask, ?> task) { + this.numCheckpoints = numCheckpoints; + checkpointId = new AtomicLong(0); + sourceTask = task; + this.checkpointInterval = checkpointInterval; + } + + @Override + public Boolean call() throws Exception { + for (int i = 0; i < numCheckpoints; i++) { + long currentCheckpointId = checkpointId.getAndIncrement(); + sourceTask.triggerCheckpoint(currentCheckpointId, 0L); + Thread.sleep(checkpointInterval); + } + return true; + } + } +} + diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java new file mode 100644 index 0000000000000..9864115376113 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.types.Record; +import org.apache.flink.util.MutableObjectIterator; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class StreamMockEnvironment implements Environment { + + private final MemoryManager memManager; + + private final IOManager ioManager; + + private final InputSplitProvider inputSplitProvider; + + private final Configuration jobConfiguration; + + private final Configuration taskConfiguration; + + private final List inputs; + + private final List outputs; + + private final JobID jobID = new JobID(); + + private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager(); + + private final int bufferSize; + + public StreamMockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { + this.jobConfiguration = new Configuration(); + this.taskConfiguration = new Configuration(); + this.inputs = new LinkedList(); + this.outputs = new LinkedList(); + + this.memManager = new DefaultMemoryManager(memorySize, 1); + this.ioManager = new IOManagerAsync(); + this.inputSplitProvider = inputSplitProvider; + this.bufferSize = bufferSize; + } + + public IteratorWrappingTestSingleInputGate addInput(MutableObjectIterator inputIterator) { + try { + final IteratorWrappingTestSingleInputGate reader = new IteratorWrappingTestSingleInputGate(bufferSize, Record.class, inputIterator); + + inputs.add(reader.getInputGate()); + + return reader; + } + catch (Throwable t) { + throw new RuntimeException("Error setting up mock readers: " + t.getMessage(), t); + } + } + + public void addOutput(final List outputList, final TypeSerializer serializer) { + try { + // The record-oriented writers wrap the buffer writer. We mock it + // to collect the returned buffers and deserialize the content to + // the output list + BufferProvider mockBufferProvider = mock(BufferProvider.class); + when(mockBufferProvider.requestBufferBlocking()).thenAnswer(new Answer() { + + @Override + public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { + return new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class)); + } + }); + + ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class); + when(mockWriter.getNumberOfOutputChannels()).thenReturn(1); + when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider); + + final RecordDeserializer> recordDeserializer = new AdaptiveSpanningRecordDeserializer>(); + final NonReusingDeserializationDelegate delegate = new NonReusingDeserializationDelegate(serializer); + + // Add records from the buffer to the output list + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + Buffer buffer = (Buffer) invocationOnMock.getArguments()[0]; + + recordDeserializer.setNextBuffer(buffer); + + while (recordDeserializer.hasUnfinishedData()) { + RecordDeserializer.DeserializationResult result = recordDeserializer.getNextRecord(delegate); + + if (result.isFullRecord()) { + outputList.add(delegate.getInstance()); + } + + if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER + || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) { + break; + } + } + + return null; + } + }).when(mockWriter).writeBuffer(any(Buffer.class), anyInt()); + + outputs.add(mockWriter); + } + catch (Throwable t) { + t.printStackTrace(); + fail(t.getMessage()); + } + } + + @Override + public Configuration getTaskConfiguration() { + return this.taskConfiguration; + } + + @Override + public MemoryManager getMemoryManager() { + return this.memManager; + } + + @Override + public IOManager getIOManager() { + return this.ioManager; + } + + @Override + public JobID getJobID() { + return this.jobID; + } + + @Override + public Configuration getJobConfiguration() { + return this.jobConfiguration; + } + + @Override + public int getNumberOfSubtasks() { + return 1; + } + + @Override + public int getIndexInSubtaskGroup() { + return 0; + } + + @Override + public InputSplitProvider getInputSplitProvider() { + return this.inputSplitProvider; + } + + @Override + public String getTaskName() { + return null; + } + + @Override + public String getTaskNameWithSubtasks() { + return null; + } + + @Override + public ClassLoader getUserClassLoader() { + return getClass().getClassLoader(); + } + + @Override + public Map> getDistributedCacheEntries() { + return Collections.emptyMap(); + } + + @Override + public ResultPartitionWriter getWriter(int index) { + return outputs.get(index); + } + + @Override + public ResultPartitionWriter[] getAllWriters() { + return outputs.toArray(new ResultPartitionWriter[outputs.size()]); + } + + @Override + public InputGate getInputGate(int index) { + return inputs.get(index); + } + + @Override + public InputGate[] getAllInputGates() { + InputGate[] gates = new InputGate[inputs.size()]; + inputs.toArray(gates); + return gates; + } + + @Override + public JobVertexID getJobVertexId() { + return new JobVertexID(new byte[16]); + } + + @Override + public ExecutionAttemptID getExecutionId() { + return new ExecutionAttemptID(0L, 0L); + } + + @Override + public BroadcastVariableManager getBroadcastVariableManager() { + return this.bcVarManager; + } + + @Override + public void reportAccumulators(Map> accumulators) { + // discard, this is only for testing + } + + @Override + public void acknowledgeCheckpoint(long checkpointId) { + } + + @Override + public void acknowledgeCheckpoint(long checkpointId, StateHandle state) { + } +} + diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java new file mode 100644 index 0000000000000..f1a36c8593ec8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.types.Record; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; + +import java.util.List; + + +public abstract class StreamTaskTestBase { + + protected long memorySize = 0; + + protected StreamMockEnvironment mockEnv; + + public void initEnvironment(long memorySize, int bufferSize) { + this.memorySize = memorySize; + this.mockEnv = new StreamMockEnvironment(this.memorySize, new MockInputSplitProvider(), bufferSize); + } + + public IteratorWrappingTestSingleInputGate addInput(MutableObjectIterator input, int groupId) { + final IteratorWrappingTestSingleInputGate reader = addInput(input, groupId, true); + + return reader; + } + + public IteratorWrappingTestSingleInputGate addInput(MutableObjectIterator input, int groupId, boolean read) { + final IteratorWrappingTestSingleInputGate reader = this.mockEnv.addInput(input); + TaskConfig conf = new TaskConfig(this.mockEnv.getTaskConfiguration()); + conf.addInputToGroup(groupId); + conf.setInputSerializer(RecordSerializerFactory.get(), groupId); + + if (read) { + reader.read(); + } + + return reader; + } + + public void addOutput(List output, TypeSerializer serializer) { + this.mockEnv.addOutput(output, serializer); + TaskConfig conf = new TaskConfig(this.mockEnv.getTaskConfiguration()); + conf.addOutputShipStrategy(ShipStrategyType.FORWARD); + conf.setOutputSerializer(RecordSerializerFactory.get()); + } + + public Configuration getConfiguration() { + return this.mockEnv.getTaskConfiguration(); + } + + public StreamConfig getStreamConfig() { + return new StreamConfig(this.mockEnv.getTaskConfiguration()); + } + + public void registerTask(AbstractInvokable task) { + task.setEnvironment(this.mockEnv); + task.registerInputOutput(); + } + + public MemoryManager getMemoryManager() { + return this.mockEnv.getMemoryManager(); + } + + @After + public void shutdownIOManager() throws Exception { + this.mockEnv.getIOManager().shutdown(); + Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown()); + } + + @After + public void shutdownMemoryManager() throws Exception { + if (this.memorySize > 0) { + MemoryManager memMan = getMemoryManager(); + if (memMan != null) { + Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty()); + memMan.shutdown(); + } + } + } +} + diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java index 95cb65c0b1928..77600f3512766 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java @@ -34,9 +34,7 @@ public static List createAndExecute(SourceFunction sourceFunction) thr } try { Collector collector = new MockOutput(outputs); - while (!sourceFunction.reachedEnd()) { - collector.collect(sourceFunction.next()); - } + sourceFunction.run(new Object(), collector); } catch (Exception e) { throw new RuntimeException("Cannot invoke source.", e); } diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index 7d985ab5b2624..26829425118e1 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; @@ -110,20 +111,24 @@ private static class RandomFibonacciSource implements SourceFunction next() throws Exception { - int first = rnd.nextInt(BOUND / 2 - 1) + 1; - int second = rnd.nextInt(BOUND / 2 - 1) + 1; + public void run(Object checkpointLock, Collector> collector) throws Exception { + + while (isRunning) { + int first = rnd.nextInt(BOUND / 2 - 1) + 1; + int second = rnd.nextInt(BOUND / 2 - 1) + 1; - Thread.sleep(500L); - return new Tuple2(first, second); + collector.collect(new Tuple2(first, second)); + Thread.sleep(500L); + } } + @Override + public void cancel() { + isRunning = false; + } } /** diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index a4a350e6b333a..8d7e5def8c278 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.util.Collector; import java.util.Random; @@ -103,6 +104,7 @@ public static class GradeSource implements SourceFunction outTuple; + private volatile boolean isRunning = true; public GradeSource() { rand = new Random(); @@ -110,18 +112,19 @@ public GradeSource() { } @Override - public boolean reachedEnd() throws Exception { - return false; + public void run(Object checkpointLock, Collector> out) throws Exception { + while (isRunning) { + outTuple.f0 = names[rand.nextInt(names.length)]; + outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1; + Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); + out.collect(outTuple); + } } @Override - public Tuple2 next() throws Exception { - outTuple.f0 = names[rand.nextInt(names.length)]; - outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1; - Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); - return outTuple; + public void cancel() { + isRunning = false; } - } /** @@ -132,30 +135,36 @@ public static class SalarySource extends RichSourceFunction outTuple; + private volatile boolean isRunning; public void open(Configuration parameters) throws Exception { super.open(parameters); rand = new Random(); outTuple = new Tuple2(); + isRunning = true; } + @Override - public boolean reachedEnd() throws Exception { - return false; + public void run(Object checkpointLock, Collector> out) throws Exception { + while (isRunning) { + outTuple.f0 = names[rand.nextInt(names.length)]; + outTuple.f1 = rand.nextInt(SALARY_MAX) + 1; + Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); + out.collect(outTuple); + } } @Override - public Tuple2 next() throws Exception { - outTuple.f0 = names[rand.nextInt(names.length)]; - outTuple.f1 = rand.nextInt(SALARY_MAX) + 1; - Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); - return outTuple; + public void cancel() { + isRunning = false; } - } public static class MySourceMap extends RichMapFunction> { + private static final long serialVersionUID = 1L; + private String[] record; public MySourceMap() { @@ -188,6 +197,9 @@ public Tuple3 join(Tuple2 first, } public static class MyTimestamp implements Timestamp> { + + private static final long serialVersionUID = 1L; + private int counter; public MyTimestamp(int starttime) { diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java index 9fb7caea19b90..1b30f595c0629 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java @@ -94,17 +94,24 @@ public static class NewDataSource implements SourceFunction { private static final long serialVersionUID = 1L; private static final int NEW_DATA_SLEEP_TIME = 1000; + private volatile boolean isRunning = true; + @Override - public boolean reachedEnd() throws Exception { - return false; + public void run(Object checkpointLock, Collector collector) throws Exception { + while (isRunning) { + collector.collect(getNewData()); + } } - @Override - public Integer next() throws Exception { + private Integer getNewData() throws InterruptedException { Thread.sleep(NEW_DATA_SLEEP_TIME); return 1; } - + + @Override + public void cancel() { + isRunning = true; + } } /** @@ -115,22 +122,24 @@ public static class FiniteNewDataSource implements SourceFunction { private static final long serialVersionUID = 1L; private int counter; - private Integer getNewData() throws InterruptedException { - Thread.sleep(5); - counter++; - return 1; - } - @Override - public boolean reachedEnd() throws Exception { - return counter >= 50; + public void run(Object checkpointLock, Collector collector) throws Exception { + Thread.sleep(15); + while (counter < 50) { + collector.collect(getNewData()); + } } @Override - public Integer next() throws Exception { - return getNewData(); + public void cancel() { + // No cleanup needed } + private Integer getNewData() throws InterruptedException { + Thread.sleep(5); + counter++; + return 1; + } } /** @@ -141,17 +150,26 @@ public static class TrainingDataSource implements SourceFunction { private static final long serialVersionUID = 1L; private static final int TRAINING_DATA_SLEEP_TIME = 10; + private volatile boolean isRunning = true; + @Override - public boolean reachedEnd() throws Exception { - return false; + public void run(Object checkpointLock, Collector collector) throws Exception { + while (isRunning) { + collector.collect(getTrainingData()); + } + } - @Override - public Integer next() throws Exception { + private Integer getTrainingData() throws InterruptedException { Thread.sleep(TRAINING_DATA_SLEEP_TIME); return 1; - } + } + + @Override + public void cancel() { + isRunning = false; + } } /** @@ -162,21 +180,22 @@ public static class FiniteTrainingDataSource implements SourceFunction private static final long serialVersionUID = 1L; private int counter = 0; - private Integer getTrainingData() throws InterruptedException { - counter++; - return 1; - } - @Override - public boolean reachedEnd() throws Exception { - return counter >= 8200; + public void run(Object checkpointLock, Collector collector) throws Exception { + while (counter < 8200) { + collector.collect(getTrainingData()); + } } @Override - public Integer next() throws Exception { - return getTrainingData(); + public void cancel() { + // No cleanup needed } + private Integer getTrainingData() throws InterruptedException { + counter++; + return 1; + } } public static class LinearTimestamp implements Timestamp { diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index f0ebccc8135c7..2dd937820962b 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flink.streaming.examples.windowing; @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger; import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; +import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; @@ -54,24 +55,26 @@ public static void main(String[] args) throws Exception { DataStream> source = env .addSource(new SourceFunction>() { - int index = 0; + private static final long serialVersionUID = 1L; @Override - public boolean reachedEnd() throws Exception { - return index >= input.size(); + public void run(Object checkpointLock, Collector> collector) + throws Exception { + for (Tuple3 value : input) { + // We sleep three seconds between every output so we + // can see whether we properly detect sessions + // before the next start for a specific id + collector.collect(value); + if (!fileOutput) { + System.out.println("Collected: " + value); + Thread.sleep(3000); + } + } } @Override - public Tuple3 next() throws Exception { - Tuple3 result = input.get(index); - index++; - if (!fileOutput) { - System.out.println("Collected: " + result); - Thread.sleep(3000); - } - return result; + public void cancel() { } - }); // We create sessions for each id with max timeout of 3 time units diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java index 68229f3595898..850e30df051fa 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flink.streaming.examples.windowing; @@ -26,17 +26,18 @@ import org.apache.flink.streaming.api.windowing.helper.Delta; import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.util.Collector; import java.util.Arrays; import java.util.Random; /** -* An example of grouped stream windowing where different eviction and trigger -* policies can be used. A source fetches events from cars every 1 sec -* containing their id, their current speed (kmh), overall elapsed distance (m) -* and a timestamp. The streaming example triggers the top speed of each car -* every x meters elapsed for the last y seconds. -*/ + * An example of grouped stream windowing where different eviction and trigger + * policies can be used. A source fetches events from cars every 1 sec + * containing their id, their current speed (kmh), overall elapsed distance (m) + * and a timestamp. The streaming example triggers the top speed of each car + * every x meters elapsed for the last y seconds. + */ public class TopSpeedWindowingExample { // ************************************************************************* @@ -62,6 +63,9 @@ public static void main(String[] args) throws Exception { .window(Time.of(evictionSec, new CarTimestamp())) .every(Delta.of(triggerMeters, new DeltaFunction>() { + private static final long serialVersionUID = 1L; + + @Override public double getDelta( Tuple4 oldDataPoint, @@ -90,7 +94,7 @@ private static class CarSource implements SourceFunction> collector) + throws Exception { + + while (isRunning) { + Thread.sleep(1000); + for (int carId = 0; carId < speeds.length; carId++) { + if (rand.nextBoolean()) { + speeds[carId] = Math.min(100, speeds[carId] + 5); + } else { + speeds[carId] = Math.max(0, speeds[carId] - 5); + } + distances[carId] += speeds[carId] / 3.6d; + Tuple4 record = new Tuple4(carId, + speeds[carId], distances[carId], System.currentTimeMillis()); + collector.collect(record); + } + } } @Override - public Tuple4 next() throws Exception { - if (rand.nextBoolean()) { - speeds[carId] = Math.min(100, speeds[carId] + 5); - } else { - speeds[carId] = Math.max(0, speeds[carId] - 5); - } - distances[carId] += speeds[carId] / 3.6d; - Tuple4 record = new Tuple4(carId, - speeds[carId], distances[carId], System.currentTimeMillis()); - carId++; - if (carId >= speeds.length) { - carId = 0; - } - return record; + public void cancel() { + isRunning = false; } - } private static class ParseCarData extends @@ -141,6 +147,7 @@ public Tuple4 map(String record) { } private static class CarTimestamp implements Timestamp> { + private static final long serialVersionUID = 1L; @Override public long getTimestamp(Tuple4 value) { @@ -169,8 +176,7 @@ private static boolean parseParameters(String[] args) { inputPath = args[0]; outputPath = args[1]; } else { - System.err - .println("Usage: TopSpeedWindowingExample "); + System.err.println("Usage: TopSpeedWindowingExample "); return false; } } diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 62e24d332d365..b2ccf8cc75228 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction} import org.apache.flink.types.StringValue -import org.apache.flink.util.SplittableIterator +import org.apache.flink.util.{Collector, SplittableIterator} import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -399,20 +399,20 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { val typeInfo = implicitly[TypeInformation[T]] javaEnv.addSource(cleanFun).returns(typeInfo) } - - /** + + /** * Create a DataStream using a user defined source function for arbitrary * source functionality. * */ - def addSource[T: ClassTag: TypeInformation](function: () => T): DataStream[T] = { + def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = { require(function != null, "Function must not be null.") val sourceFunction = new SourceFunction[T] { val cleanFun = StreamExecutionEnvironment.clean(function) - - override def reachedEnd(): Boolean = false - - override def next(): T = cleanFun() + override def run(lockObject: AnyRef, out: Collector[T]) { + cleanFun(out) + } + override def cancel() = {} } addSource(sourceFunction) } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index 1e2979534b262..64e1f24d8be12 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -22,8 +22,7 @@ import java.util.Map; import java.util.Random; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.configuration.ConfigConstants; @@ -32,13 +31,16 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.Collector; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -112,28 +114,14 @@ public void runCheckpointedProgram() { stream // -------------- first vertex, chained to the source ---------------- - - .filter(new FilterFunction() { - @Override - public boolean filter(String value) { - return value.length() < 100; - } - }) - .map(new MapFunction() { - - @Override - public PrefixCount map(String value) { - return new PrefixCount(value.substring(0, 1), value, 1L); - } - }) + .filter(new StringRichFilterFunction()) // -------------- seconds vertex - the stateful one that also fails ---------------- - + .map(new StringPrefixCountRichMapFunction()) .startNewChain() .map(new StatefulCounterFunction()) // -------------- third vertex - reducer and the sink ---------------- - .groupBy("prefix") .reduce(new OnceFailingReducer(NUM_STRINGS)) .addSink(new RichSinkFunction() { @@ -161,15 +149,28 @@ public void invoke(PrefixCount value) { env.execute(); + long filterSum = 0; + for (long l : StringRichFilterFunction.counts) { + filterSum += l; + } + + long mapSum = 0; + for (long l : StringPrefixCountRichMapFunction.counts) { + mapSum += l; + } + long countSum = 0; for (long l : StatefulCounterFunction.counts) { countSum += l; } - + // verify that we counted exactly right // this line should be uncommented once the "exactly one off by one" is fixed -// assertEquals(NUM_STRINGS, countSum); + // if this fails we see at which point the count is off + assertEquals(NUM_STRINGS, filterSum); + assertEquals(NUM_STRINGS, mapSum); + assertEquals(NUM_STRINGS, countSum); } catch (Exception e) { e.printStackTrace(); @@ -181,8 +182,8 @@ public void invoke(PrefixCount value) { // Custom Functions // -------------------------------------------------------------------------------------------- - private static class StringGeneratingSourceFunction extends RichParallelSourceFunction - implements Checkpointed { + private static class StringGeneratingSourceFunction extends RichSourceFunction + implements Checkpointed, ParallelSourceFunction { private final long numElements; @@ -192,6 +193,14 @@ private static class StringGeneratingSourceFunction extends RichParallelSourceFu private long index; private int step; + private volatile boolean isRunning; + + static final long[] counts = new long[PARALLELISM]; + @Override + public void close() { + counts[getRuntimeContext().getIndexOfThisSubtask()] = index; + } + StringGeneratingSourceFunction(long numElements) { this.numElements = numElements; @@ -206,23 +215,29 @@ public void open(Configuration parameters) { if (index == 0) { index = getRuntimeContext().getIndexOfThisSubtask(); } + isRunning = true; } @Override - public boolean reachedEnd() throws Exception { - return index >= numElements; - } + public void run(Object lockingObject, Collector out) throws Exception { + while (isRunning && index < numElements) { + char first = (char) ((index % 40) + 40); - @Override - public String next() throws Exception { - char first = (char) ((index % 40) + 40); + stringBuilder.setLength(0); + stringBuilder.append(first); - stringBuilder.setLength(0); - stringBuilder.append(first); + String result = randomString(stringBuilder, rnd); - String result = randomString(stringBuilder, rnd); - index += step; - return result; + synchronized (lockingObject) { + index += step; + out.collect(result); + } + } + } + + @Override + public void cancel() { + isRunning = false; } @Override @@ -250,9 +265,7 @@ private static String randomString(StringBuilder bld, Random rnd) { private static class StatefulCounterFunction extends RichMapFunction implements Checkpointed { - static final long[] counts = new long[PARALLELISM]; - private long count = 0; @Override public PrefixCount map(PrefixCount value) throws Exception { @@ -260,6 +273,10 @@ public PrefixCount map(PrefixCount value) throws Exception { return value; } + static final long[] counts = new long[PARALLELISM]; + + private long count = 0; + @Override public void close() { counts[getRuntimeContext().getIndexOfThisSubtask()] = count; @@ -334,4 +351,61 @@ public String toString() { return prefix + " / " + value; } } + + private static class StringRichFilterFunction extends RichFilterFunction implements Checkpointed { + + @Override + public boolean filter(String value) { + count++; + return value.length() < 100; + } + + static final long[] counts = new long[PARALLELISM]; + + private long count = 0; + + @Override + public void close() { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count; + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) { + return count; + } + + @Override + public void restoreState(Long state) { + count = state; + } + } + + private static class StringPrefixCountRichMapFunction extends RichMapFunction implements Checkpointed { + + + @Override + public PrefixCount map(String value) { + count++; + return new PrefixCount(value.substring(0, 1), value, 1L); + } + + static final long[] counts = new long[PARALLELISM]; + + private long count = 0; + + @Override + public void close() { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count; + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) { + return count; + } + + @Override + public void restoreState(Long state) { + count = state; + } + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java index a25024eb1694d..0b99e04fab3c6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java @@ -19,20 +19,11 @@ package org.apache.flink.test.recovery; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.charset.Charset; -import java.util.HashSet; import java.util.UUID; import org.apache.commons.io.FileUtils; @@ -46,6 +37,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.apache.flink.util.Collector; +import org.junit.Assert; /** * Test for streaming program behaviour in case of TaskManager failure @@ -68,11 +62,6 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur @Override public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception { - final File tempTestOutput = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), - UUID.randomUUID().toString()); - - assertTrue("Cannot create directory for temp output", tempTestOutput.mkdirs()); - final File tempCheckpointDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), UUID.randomUUID().toString()); @@ -88,64 +77,28 @@ public void testProgram(int jobManagerPort, final File coordinateDir) throws Exc DataStream result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT)) // add a non-chained no-op map to test the chain state restore logic - .rebalance().map(new MapFunction() { + .map(new MapFunction() { @Override public Long map(Long value) throws Exception { return value; } - }) + }).startNewChain() // populate the coordinate directory so we can proceed to TaskManager failure - .map(new StatefulMapper(coordinateDir)); + .map(new StatefulMapper(coordinateDir)); //write result to temporary file - result.addSink(new RichSinkFunction() { - - // the sink needs to do its write operations synchronized with - // the disk FS, otherwise the process kill will discard data - // in buffers in the process - private transient FileChannel writer; - - @Override - public void open(Configuration parameters) throws IOException { - - int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); - File output = new File(tempTestOutput, "task-" + taskIndex + "-" + UUID.randomUUID().toString()); - - // "rws" causes writes to go synchronously to the filesystem, nothing is cached - RandomAccessFile outputFile = new RandomAccessFile(output, "rws"); - this.writer = outputFile.getChannel(); - } - - @Override - public void invoke(Long value) throws Exception { - String text = value + "\n"; - byte[] bytes = text.getBytes(Charset.defaultCharset()); - ByteBuffer buffer = ByteBuffer.wrap(bytes); - writer.write(buffer); - } - - @Override - public void close() throws Exception { - writer.close(); - } - }); + result.addSink(new CheckpointedSink(DATA_COUNT)); try { // blocking call until execution is done env.execute(); - // validate - fileBatchHasEveryNumberLower(PARALLELISM, DATA_COUNT, tempTestOutput); - // TODO: Figure out why this fails when ran with other tests // Check whether checkpoints have been cleaned up properly // assertDirectoryEmpty(tempCheckpointDir); } finally { // clean up - if (tempTestOutput.exists()) { - FileUtils.deleteDirectory(tempTestOutput); - } if (tempCheckpointDir.exists()) { FileUtils.deleteDirectory(tempCheckpointDir); } @@ -154,18 +107,16 @@ public void close() throws Exception { public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction implements Checkpointed { + private static final long serialVersionUID = 1L; private static final long SLEEP_TIME = 50; private final File coordinateDir; private final long end; - private long toCollect; private long collected; - private boolean checkForProceedFile; - private File proceedFile; - private long stepSize; - private long congruence; + + private volatile boolean isRunning = true; public SleepyDurableGenerateSequence(File coordinateDir, long end) { this.coordinateDir = coordinateDir; @@ -173,37 +124,39 @@ public SleepyDurableGenerateSequence(File coordinateDir, long end) { } @Override - public void open(Configuration config) { - stepSize = getRuntimeContext().getNumberOfParallelSubtasks(); - congruence = getRuntimeContext().getIndexOfThisSubtask(); - toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize); - collected = 0L; - - proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); - checkForProceedFile = true; - } - - @Override - public boolean reachedEnd() throws Exception { - return collected >= toCollect; - } + public void run(Object checkpointLock, Collector collector) throws Exception { + + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + + final long stepSize = context.getNumberOfParallelSubtasks(); + final long congruence = context.getIndexOfThisSubtask(); + final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize); + + final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); + boolean checkForProceedFile = true; + + while (isRunning && collected < toCollect) { + // check if the proceed file exists (then we go full speed) + // if not, we always recheck and sleep + if (checkForProceedFile) { + if (proceedFile.exists()) { + checkForProceedFile = false; + } else { + // otherwise wait so that we make slow progress + Thread.sleep(SLEEP_TIME); + } + } - @Override - public Long next() throws Exception { - // check if the proceed file exists (then we go full speed) - // if not, we always recheck and sleep - if (checkForProceedFile) { - if (proceedFile.exists()) { - checkForProceedFile = false; - } else { - // otherwise wait so that we make slow progress - Thread.sleep(SLEEP_TIME); + synchronized (checkpointLock) { + collector.collect(collected * stepSize + congruence); + collected++; } } + } - long result = collected * stepSize + congruence; - collected++; - return result; + @Override + public void cancel() { + isRunning = false; } @Override @@ -216,7 +169,6 @@ public void restoreState(Long state) { collected = state; } } - public static class StatefulMapper extends RichMapFunction implements Checkpointed { private boolean markerCreated = false; @@ -255,39 +207,47 @@ public void restoreState(Integer state) { } } + private static class CheckpointedSink extends RichSinkFunction implements Checkpointed { - private static void fileBatchHasEveryNumberLower(int numFiles, int numbers, File path) throws IOException { + private long stepSize; + private long congruence; + private long toCollect; + private long collected = 0L; + private long end; - HashSet set = new HashSet(numbers); + public CheckpointedSink(long end) { + this.end = end; + } - File[] files = path.listFiles(); - assertNotNull(files); - assertTrue("Not enough output files", files.length >= numFiles); + @Override + public void open(Configuration parameters) throws IOException { + stepSize = getRuntimeContext().getNumberOfParallelSubtasks(); + congruence = getRuntimeContext().getIndexOfThisSubtask(); + toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize); + } - for (File file : files) { - assertTrue("Output file does not exist", file.exists()); + @Override + public void invoke(Long value) throws Exception { + long expected = collected * stepSize + congruence; + + Assert.assertTrue("Value did not match expected value. " + expected + " != " + value, value.equals(expected)); - BufferedReader bufferedReader = new BufferedReader(new FileReader(file)); + collected++; - String line; - while ((line = bufferedReader.readLine()) != null) { - int num = Integer.parseInt(line); - set.add(num); + if (collected > toCollect) { + Assert.fail("Collected <= toCollect: " + collected + " > " + toCollect); } - bufferedReader.close(); } - for (int i = 0; i < numbers; i++) { - if (!set.contains(i)) { - fail("Missing number: " + i); - } + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return collected; + } + + @Override + public void restoreState(Long state) { + collected = state; } - } - - private static void assertDirectoryEmpty(File path){ - File[] files = path.listFiles(); - assertNotNull(files); - assertEquals("Checkpoint dir is not empty", 0, files.length); } }