Skip to content

Commit

Permalink
[FLINK-2707] [streaming] Set StateCheckpointer before default state v…
Browse files Browse the repository at this point in the history
…alue
  • Loading branch information
gyfora committed Sep 18, 2015
1 parent b234b0b commit 3e233a3
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public <S, C extends Serializable> OperatorState<S> getOperatorState(String name
throw new RuntimeException("Cannot set default state to null.");
}
StreamOperatorState<S, C> state = (StreamOperatorState<S, C>) getState(name, partitioned);
state.setDefaultState(defaultState);
state.setCheckpointer(checkpointer);
state.setDefaultState(defaultState);

return (OperatorState<S>) state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.state.StateCheckpointer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -278,7 +279,19 @@ public String map(Integer value) throws Exception {

@Override
public void open(Configuration conf) throws IOException {
groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true);
groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true,
new StateCheckpointer<Integer, String>() {

@Override
public String snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
return state.toString();
}

@Override
public Integer restoreState(String stateSnapshot) {
return Integer.parseInt(stateSnapshot);
}
});
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 3e233a3

Please sign in to comment.