Skip to content

Commit

Permalink
[FLINK-5113] Port functions in tests to new CheckpointedFunction IF.
Browse files Browse the repository at this point in the history
This closes apache#2939.
  • Loading branch information
kl0u authored and zentol committed Jan 19, 2017
1 parent 570dbc8 commit 525edf1
Show file tree
Hide file tree
Showing 26 changed files with 608 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

/**
Expand All @@ -50,6 +52,9 @@ public static void main(String[] args) throws Exception {
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.enableWriteAheadLog()
.setClusterBuilder(new ClusterBuilder() {

private static final long serialVersionUID = 2793938419775311824L;

@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
Expand All @@ -62,7 +67,9 @@ public Cluster buildCluster(Cluster.Builder builder) {
env.execute();
}

public static class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> {
public static class MySource implements SourceFunction<Tuple2<String, Integer>>, ListCheckpointed<Integer> {
private static final long serialVersionUID = 4022367939215095610L;

private int counter = 0;
private boolean stop = false;

Expand All @@ -84,13 +91,16 @@ public void cancel() {
}

@Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return counter;
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(this.counter);
}

@Override
public void restoreState(Integer state) throws Exception {
this.counter = state;
public void restoreState(List<Integer> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}
this.counter = state.get(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand All @@ -42,7 +42,9 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -236,7 +238,7 @@ public String map(String value) throws Exception {
}

private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
implements CheckpointedAsynchronously<Integer> {
implements ListCheckpointed<Integer> {

private static final long serialVersionUID = 1L;

Expand All @@ -246,7 +248,6 @@ private static class StringGeneratingSourceFunction extends RichParallelSourceFu

private volatile boolean isRunning = true;


StringGeneratingSourceFunction(long numElements) {
this.numElements = numElements;
}
Expand Down Expand Up @@ -288,13 +289,16 @@ private static String randomString(StringBuilder bld, Random rnd) {
}

@Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
return index;
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(this.index);
}

@Override
public void restoreState(Integer state) {
index = state;
public void restoreState(List<Integer> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}
this.index = state.get(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand All @@ -42,7 +42,9 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -233,7 +235,7 @@ public String map(String value) throws Exception {
}

private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
implements CheckpointedAsynchronously<Integer> {
implements ListCheckpointed<Integer> {

private static final long serialVersionUID = 1L;

Expand All @@ -243,7 +245,6 @@ private static class StringGeneratingSourceFunction extends RichParallelSourceFu

private volatile boolean isRunning = true;


StringGeneratingSourceFunction(long numElements) {
this.numElements = numElements;
}
Expand Down Expand Up @@ -285,13 +286,16 @@ private static String randomString(StringBuilder bld, Random rnd) {
}

@Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
return index;
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(index);
}

@Override
public void restoreState(Integer state) {
index = state;
public void restoreState(List<Integer> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}
this.index = state.get(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,23 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
Expand Down Expand Up @@ -1925,7 +1919,7 @@ private static void printTopic(String topicName, int elements,DeserializationSch


public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
implements Checkpointed<Integer>, CheckpointListener {
implements ListCheckpointed<Integer>, CheckpointListener {

private static final long serialVersionUID = 6334389850158707313L;

Expand All @@ -1939,7 +1933,6 @@ public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
private boolean failer;
private boolean hasBeenCheckpointed;


public BrokerKillingMapper(int shutdownBrokerId, int failCount) {
this.shutdownBrokerId = shutdownBrokerId;
this.failCount = failCount;
Expand Down Expand Up @@ -1994,13 +1987,16 @@ public void notifyCheckpointComplete(long checkpointId) {
}

@Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
return numElementsTotal;
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(this.numElementsTotal);
}

@Override
public void restoreState(Integer state) {
this.numElementsTotal = state;
public void restoreState(List<Integer> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}
this.numElementsTotal = state.get(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;


public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
Checkpointed<Integer>, CheckpointListener, Runnable {
ListCheckpointed<Integer>, CheckpointListener, Runnable {

private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);

Expand Down Expand Up @@ -89,13 +92,16 @@ public void notifyCheckpointComplete(long checkpointId) {
}

@Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
return numElementsTotal;
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(numElementsTotal);
}

@Override
public void restoreState(Integer state) {
numElementsTotal = state;
public void restoreState(List<Integer> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}
this.numElementsTotal = state.get(0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
package org.apache.flink.streaming.connectors.kafka.testutils;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.SuccessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.BitSet;
import java.util.Collections;
import java.util.List;

public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements Checkpointed<Tuple2<Integer, BitSet>> {
public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements ListCheckpointed<Tuple2<Integer, BitSet>> {

private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);

Expand All @@ -39,7 +41,6 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> impleme

private int numElements; // this is checkpointed


public ValidatingExactlyOnceSink(int numElementsTotal) {
this.numElementsTotal = numElementsTotal;
}
Expand Down Expand Up @@ -68,15 +69,20 @@ else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
}

@Override
public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
return new Tuple2<>(numElements, duplicateChecker);
public List<Tuple2<Integer, BitSet>> snapshotState(long checkpointId, long timestamp) throws Exception {
LOG.info("Snapshot of counter " + numElements + " at checkpoint " + checkpointId);
return Collections.singletonList(new Tuple2<>(numElements, duplicateChecker));
}

@Override
public void restoreState(Tuple2<Integer, BitSet> state) {
LOG.info("restoring num elements to {}", state.f0);
this.numElements = state.f0;
this.duplicateChecker = state.f1;
public void restoreState(List<Tuple2<Integer, BitSet>> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}

Tuple2<Integer, BitSet> s = state.get(0);
LOG.info("restoring num elements to {}", s.f0);
this.numElements = s.f0;
this.duplicateChecker = s.f1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ public static boolean isNullOrEmpty(Collection<?> collection) {
public static boolean isNullOrEmpty(Map<?, ?> map) {
return map == null || map.isEmpty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
* state is written, the function is not called, so the function needs not return a
* copy of its state, but may return a reference to its state. Functions that can
* continue to work and mutate the state, even while the state snapshot is being accessed,
* can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously}
* interface.</p>
* can implement the {@link CheckpointedAsynchronously} interface.</p>
*
* @param <T> The type of the operator state.
*/
Expand Down
Loading

0 comments on commit 525edf1

Please sign in to comment.