Skip to content

Commit

Permalink
[streaming] Re-enable Checkpointed interface for drawing snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Jun 25, 2015
1 parent 474ff4d commit 0ae1758
Show file tree
Hide file tree
Showing 18 changed files with 256 additions and 94 deletions.
48 changes: 44 additions & 4 deletions docs/apis/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1188,13 +1188,15 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu
Stateful computation
------------

Flink supports the checkpointing and persistence of user defined operator state, so in case of a failure this state can be restored to the latest checkpoint and the processing will continue from there. This gives exactly once processing semantics with respect to the operator states when the sources follow this stateful pattern as well. In practice this usually means that sources keep track of their current offset as their OperatorState. The `PersistentKafkaSource` provides this stateful functionality for reading streams from Kafka.
Flink supports the checkpointing and persistence of user defined operator states, so in case of a failure this state can be restored to the latest checkpoint and the processing will continue from there. This gives exactly once processing semantics with respect to the operator states when the sources follow this stateful pattern as well. In practice this usually means that sources keep track of their current offset as their OperatorState. The `PersistentKafkaSource` provides this stateful functionality for reading streams from Kafka.

Flink supports two ways of accessing operator states: partitioned and non-partitioned state access.
### OperatorState

In case of non-partitioned state access, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper.
Flink supports two types of operator states: partitioned and non-partitioned states.

In case of of partitioned `OperatorState` a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.
In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper.

In case of of partitioned operator state a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.

Checkpointing of the states needs to be enabled from the `StreamExecutionEnvironment` using the `enableCheckpointing(…)` where additional parameters can be passed to modify the default 5 second checkpoint interval.

Expand Down Expand Up @@ -1264,8 +1266,46 @@ public static class CounterSource implements RichParallelSourceFunction<Long> {

Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface.

### Checkpointed interface

Another way of exposing user defined operator state for the Flink runtime for checkpointing is by using the `Checkpointed` interface.

When the user defined function implements the `Checkpointed` interface, the `snapshotState(…)` and `restoreState(…)` methods will be executed to draw and restore function state.

For example the same counting, reduce function shown for `OperatorState`s by using the `Checkpointed` interface instead:

{% highlight java %}
public class CounterSum implements ReduceFunction<Long>, Checkpointed<Long> {

//persistent counter
private long counter = 0;

@Override
public Long reduce(Long value1, Long value2) throws Exception {
counter++;
return value1 + value2;
}

// regularly persists state during normal operation
@Override
public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return new Long(counter);
}

// restores state on recovery from failure
@Override
public void restoreState(Serializable state) {
counter = (Long) state;
}
}
{% endhighlight %}

### State checkpoints in iterative jobs

Fink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: `env.enableCheckpointing(interval, force = true)`.

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

[Back to top](#top)

Lambda expressions with Java 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common.functions;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -193,9 +194,11 @@ public interface RuntimeContext {
* The {@link StateCheckpointer} that will be used to draw
* snapshots from the user state.
* @return The {@link OperatorState} for the underlying operator.
*
* @throws IOException Thrown if the system cannot access the state.
*/
<S, C extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
boolean partitioned, StateCheckpointer<S, C> checkpointer);
boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException;

/**
* Returns the {@link OperatorState} with the given name of the underlying
Expand All @@ -220,7 +223,9 @@ <S, C extends Serializable> OperatorState<S> getOperatorState(String name, S def
* Sets whether partitioning should be applied for the given
* state. If true a partitioner key must be used.
* @return The {@link OperatorState} for the underlying operator.
*
* @throws IOException Thrown if the system cannot access the state.
*/
<S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
boolean partitioned);
boolean partitioned) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common.functions.util;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -175,13 +176,13 @@ private <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name

@Override
public <S, C extends Serializable> OperatorState<S> getOperatorState(String name,
S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) {
S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
}

@Override
public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
boolean partitioned) {
boolean partitioned) throws IOException{
throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.api.common.state;

import java.io.IOException;

import org.apache.flink.api.common.functions.MapFunction;

/**
Expand Down Expand Up @@ -45,8 +47,10 @@ public interface OperatorState<T> {
* independent state for each partition.
*
* @return The operator state corresponding to the current input.
*
* @throws IOException Thrown if the system cannot access the state.
*/
T getState();
T getState() throws IOException;

/**
* Updates the operator state accessible by {@link #getState()} to the given
Expand All @@ -55,7 +59,9 @@ public interface OperatorState<T> {
*
* @param state
* The new state.
*
* @throws IOException Thrown if the system cannot access the state.
*/
void updateState(T state);
void updateState(T state) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand All @@ -47,7 +46,6 @@
import kafka.server.KafkaServer;

import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -77,7 +75,6 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.streaming.api.functions.source;


import java.io.IOException;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -73,7 +75,7 @@ public void run(SourceContext<Long> ctx) throws Exception {
}

@Override
public void open(Configuration conf){
public void open(Configuration conf) throws IOException{
collected = getRuntimeContext().getOperatorState("collected", 0L, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.PartitionedStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.state.StreamOperatorState;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;

Expand Down Expand Up @@ -72,25 +75,36 @@ public void close() throws Exception {
FunctionUtils.closeFunction(userFunction);
}

@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void restoreInitialState(Map<String, PartitionedStateHandle> snapshots) throws Exception {
// We iterate over the states registered for this operator, initialize and restore it
for (Entry<String, PartitionedStateHandle> snapshot : snapshots.entrySet()) {
Map<Serializable, StateHandle<Serializable>> handles = snapshot.getValue().getState();
StreamOperatorState restoredState = runtimeContext.getState(snapshot.getKey(),
!(handles instanceof ImmutableMap));
restoredState.restoreState(snapshot.getValue().getState());
public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> snapshots) throws Exception {
// Restore state using the Checkpointed interface
if (userFunction instanceof Checkpointed) {
((Checkpointed) userFunction).restoreState(snapshots.f0.getState());
}

if (snapshots.f1 != null) {
// We iterate over the states registered for this operator, initialize and restore it
for (Entry<String, PartitionedStateHandle> snapshot : snapshots.f1.entrySet()) {
Map<Serializable, StateHandle<Serializable>> handles = snapshot.getValue().getState();
StreamOperatorState restoredState = runtimeContext.getState(snapshot.getKey(),
!(handles instanceof ImmutableMap));
restoredState.restoreState(snapshot.getValue().getState());
}
}

}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Map<String, PartitionedStateHandle> getStateSnapshotFromFunction(long checkpointId, long timestamp)
public Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp)
throws Exception {
// Get all the states for the operator
Map<String, StreamOperatorState> operatorStates = runtimeContext.getOperatorStates();

Map<String, PartitionedStateHandle> operatorStateSnapshots;
if (operatorStates.isEmpty()) {
// We return null to signal that there is nothing to checkpoint
return null;
operatorStateSnapshots = null;
} else {
// Checkpoint the states and store the handles in a map
Map<String, PartitionedStateHandle> snapshots = new HashMap<String, PartitionedStateHandle>();
Expand All @@ -100,7 +114,22 @@ public Map<String, PartitionedStateHandle> getStateSnapshotFromFunction(long che
new PartitionedStateHandle(state.getValue().snapshotState(checkpointId, timestamp)));
}

return snapshots;
operatorStateSnapshots = snapshots;
}

StateHandle<Serializable> checkpointedSnapshot = null;

if (userFunction instanceof Checkpointed) {
StateHandleProvider<Serializable> provider = runtimeContext.getStateHandleProvider();
checkpointedSnapshot = provider.createStateHandle(((Checkpointed) userFunction)
.snapshotState(checkpointId, timestamp));
}

if (operatorStateSnapshots != null || checkpointedSnapshot != null) {
return new Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>(
checkpointedSnapshot, operatorStateSnapshots);
} else {
return null;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.PartitionedStateHandle;
import org.apache.flink.runtime.state.StateHandle;

Expand All @@ -31,9 +32,9 @@
*/
public interface StatefulStreamOperator<OUT> extends StreamOperator<OUT> {

void restoreInitialState(Map<String, PartitionedStateHandle> state) throws Exception;
void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> state) throws Exception;

Map<String, PartitionedStateHandle> getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception;
Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception;

void confirmCheckpointCompleted(long checkpointId, String stateName, StateHandle<Serializable> checkpointedState) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,10 @@ public boolean containsKey(Serializable key) {
public void setCheckPointer(StateCheckpointer<S, C> checkpointer) {
this.checkpointer = checkpointer;
}

@Override
public String toString() {
return fetchedState.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.state;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;

Expand Down Expand Up @@ -68,9 +69,9 @@ public PartitionedStreamOperatorState(StateHandleProvider<C> provider,
}

@Override
public S getState() {
public S getState() throws IOException{
if (currentInput == null) {
return null;
throw new IllegalStateException("Need a valid input for accessing the state.");
} else {
try {
Serializable key = keySelector.getKey(currentInput);
Expand All @@ -80,23 +81,23 @@ public S getState() {
return defaultState;
}
} catch (Exception e) {
throw new RuntimeException(e);
throw new RuntimeException("User-defined key selector threw an exception.");
}
}
}

@Override
public void updateState(S state) {
public void updateState(S state) throws IOException {
if (state == null) {
throw new RuntimeException("Cannot set state to null.");
}
if (currentInput == null) {
throw new RuntimeException("Need a valid input for updating a state.");
throw new IllegalStateException("Need a valid input for updating a state.");
} else {
try {
stateStore.setStateForKey(keySelector.getKey(currentInput), state);
} catch (Exception e) {
throw new RuntimeException(e);
throw new RuntimeException("User-defined key selector threw an exception.");
}
}
}
Expand Down Expand Up @@ -125,5 +126,10 @@ public void restoreState(Map<Serializable, StateHandle<C>> snapshots) throws Exc
public Map<Serializable, S> getPartitionedState() throws Exception {
return stateStore.getPartitionedState();
}

@Override
public String toString() {
return stateStore.toString();
}

}
Loading

0 comments on commit 0ae1758

Please sign in to comment.