From 0d2c49005449d6e05bbf53446edac928bb7ecbb6 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Tue, 7 Jul 2015 11:07:05 +0200 Subject: [PATCH] [FLINK-2323] [api-breaking] Rename OperatorState interface methods to value() and update(..) Closes #890 --- docs/apis/streaming_guide.md | 6 ++--- .../api/common/functions/RuntimeContext.java | 8 +++---- .../flink/api/common/state/OperatorState.java | 22 +++++++++---------- .../api/persistent/PersistentKafkaSource.java | 16 +++++++------- .../source/StatefulSequenceSource.java | 6 ++--- .../state/PartitionedStreamOperatorState.java | 4 ++-- .../api/state/StreamOperatorState.java | 12 +++++----- .../api/state/StatefulOperatorTest.java | 22 +++++++++---------- .../StreamCheckpointingITCase.java | 16 +++++++------- ...ProcessFailureStreamingRecoveryITCase.java | 6 ++--- 10 files changed, 59 insertions(+), 59 deletions(-) diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index c612b6959661a..7d8ab6d0840c0 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -1220,7 +1220,7 @@ Flink supports the checkpointing and persistence of user defined operator states Flink supports two types of operator states: partitioned and non-partitioned states. -In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper. +In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.value()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `value()` will return number of inputs processed by each parallel mapper. In case of of partitioned operator state a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams. @@ -1244,7 +1244,7 @@ public class CounterSum implements RichReduceFunction { @Override public Long reduce(Long value1, Long value2) throws Exception { - counter.updateState(counter.getState() + 1); + counter.update(counter.value() + 1); return value1 + value2; } @@ -1275,7 +1275,7 @@ public static class CounterSource implements RichParallelSourceFunction { // output and state update are atomic synchronized (lock){ ctx.collect(offset); - offset.updateState(offset.getState() + 1); + offset.update(offset.value() + 1); } } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 4c8e9247ceb72..eb84d1c20c34e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -184,9 +184,9 @@ public interface RuntimeContext { * can be used by the same operator. * @param defaultState * Default value for the operator state. This will be returned - * the first time {@link OperatorState#getState()} (for every + * the first time {@link OperatorState#value()} (for every * state partition) is called before - * {@link OperatorState#updateState(Object)}. + * {@link OperatorState#update(Object)}. * @param partitioned * Sets whether partitioning should be applied for the given * state. If true a partitioner key must be used. @@ -216,9 +216,9 @@ OperatorState getOperatorState(String name, S def * can be used by the same operator. * @param defaultState * Default value for the operator state. This will be returned - * the first time {@link OperatorState#getState()} (for every + * the first time {@link OperatorState#value()} (for every * state partition) is called before - * {@link OperatorState#updateState(Object)}. + * {@link OperatorState#update(Object)}. * @param partitioned * Sets whether partitioning should be applied for the given * state. If true a partitioner key must be used. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java index 4198a50cde142..955b35bc5782f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java @@ -27,8 +27,8 @@ * partitioned (when state partitioning is defined in the program) or * non-partitioned user states. * - * State can be accessed and manipulated using the {@link #getState()} and - * {@link #updateState(T)} methods. These calls are only safe in the + * State can be accessed and manipulated using the {@link #value()} and + * {@link #update(T)} methods. These calls are only safe in the * transformation call the operator represents, for instance inside * {@link MapFunction#map()} and can lead tp unexpected behavior in the * {@link #open(org.apache.flink.configuration.Configuration)} or @@ -40,28 +40,28 @@ public interface OperatorState { /** - * Gets the current state for the operator. When the state is not - * partitioned the returned state is the same for all inputs in a given - * operator instance. If state partitioning is applied, the state returned + * Returns the current value for the state. When the state is not + * partitioned the returned value is the same for all inputs in a given + * operator instance. If state partitioning is applied, the value returned * depends on the current operator input, as the operator maintains an * independent state for each partition. * - * @return The operator state corresponding to the current input. + * @return The operator state value corresponding to the current input. * * @throws IOException Thrown if the system cannot access the state. */ - T getState() throws IOException; + T value() throws IOException; /** - * Updates the operator state accessible by {@link #getState()} to the given - * value. The next time {@link #getState()} is called (for the same state + * Updates the operator state accessible by {@link #value()} to the given + * value. The next time {@link #value()} is called (for the same state * partition) the returned state will represent the updated value. * * @param state - * The new state. + * The new value for the state. * * @throws IOException Thrown if the system cannot access the state. */ - void updateState(T state) throws IOException; + void update(T value) throws IOException; } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java index befbef6cc9dad..6758f2ca9d584 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java @@ -148,17 +148,17 @@ public void open(Configuration parameters) throws Exception { this.lastOffsets = getRuntimeContext().getOperatorState("offset", new long[numPartitions], false); this.commitedOffsets = new long[numPartitions]; // check if there are offsets to restore - if (!Arrays.equals(lastOffsets.getState(), new long[numPartitions])) { - if (lastOffsets.getState().length != numPartitions) { - throw new IllegalStateException("There are "+lastOffsets.getState().length+" offsets to restore for topic "+topicName+" but " + + if (!Arrays.equals(lastOffsets.value(), new long[numPartitions])) { + if (lastOffsets.value().length != numPartitions) { + throw new IllegalStateException("There are "+lastOffsets.value().length+" offsets to restore for topic "+topicName+" but " + "there are only "+numPartitions+" in the topic"); } - LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.getState())); - setOffsetsInZooKeeper(lastOffsets.getState()); + LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.value())); + setOffsetsInZooKeeper(lastOffsets.value()); } else { // initialize empty offsets - Arrays.fill(this.lastOffsets.getState(), -1); + Arrays.fill(this.lastOffsets.value(), -1); } Arrays.fill(this.commitedOffsets, 0); // just to make it clear @@ -175,7 +175,7 @@ public void run(SourceContext ctx) throws Exception { while (running && iteratorToRead.hasNext()) { MessageAndMetadata message = iteratorToRead.next(); - if(lastOffsets.getState()[message.partition()] >= message.offset()) { + if(lastOffsets.value()[message.partition()] >= message.offset()) { LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition()); continue; } @@ -188,7 +188,7 @@ public void run(SourceContext ctx) throws Exception { // make the state update and the element emission atomic synchronized (checkpointLock) { - lastOffsets.getState()[message.partition()] = message.offset(); + lastOffsets.value()[message.partition()] = message.offset(); ctx.collect(next); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java index 9a2ba4c0ff179..2d74e38f5c35f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java @@ -63,14 +63,14 @@ public void run(SourceContext ctx) throws Exception { ((end - start + 1) / stepSize + 1) : ((end - start + 1) / stepSize); - Long currentCollected = collected.getState(); + Long currentCollected = collected.value(); while (isRunning && currentCollected < toCollect) { synchronized (checkpointLock) { ctx.collect(currentCollected * stepSize + congruence); - collected.updateState(currentCollected + 1); + collected.update(currentCollected + 1); } - currentCollected = collected.getState(); + currentCollected = collected.value(); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java index 808b7c846b7e6..bfc160f6fae8b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java @@ -69,7 +69,7 @@ public PartitionedStreamOperatorState(StateHandleProvider provider, } @Override - public S getState() throws IOException{ + public S value() throws IOException{ if (currentInput == null) { throw new IllegalStateException("Need a valid input for accessing the state."); } else { @@ -87,7 +87,7 @@ public S getState() throws IOException{ } @Override - public void updateState(S state) throws IOException { + public void update(S state) throws IOException { if (state == null) { throw new RuntimeException("Cannot set state to null."); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java index 1699c27a561b2..a80d730740206 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java @@ -59,12 +59,12 @@ public StreamOperatorState(StateHandleProvider provider) { } @Override - public S getState() throws IOException { + public S value() throws IOException { return state; } @Override - public void updateState(S state) throws IOException { + public void update(S state) throws IOException { if (state == null) { throw new RuntimeException("Cannot set state to null."); } @@ -72,8 +72,8 @@ public void updateState(S state) throws IOException { } public void setDefaultState(S defaultState) throws IOException { - if (getState() == null) { - updateState(defaultState); + if (value() == null) { + update(defaultState); } } @@ -92,12 +92,12 @@ protected StateHandleProvider getStateHandleProvider() { public Map> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { return ImmutableMap.of(DEFAULTKEY, provider.createStateHandle(checkpointer.snapshotState( - getState(), checkpointId, checkpointTimestamp))); + value(), checkpointId, checkpointTimestamp))); } public void restoreState(Map> snapshots) throws Exception { - updateState(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState())); + update(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState())); } public Map getPartitionedState() throws Exception { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java index 774b43119bf53..8c7ffebb4f4d1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java @@ -71,9 +71,9 @@ public void simpleStateTest() throws Exception { processInputs(map, Arrays.asList(1, 2, 3, 4, 5)); assertEquals(Arrays.asList("1", "2", "3", "4", "5"), out); - assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).getState()); + assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).value()); assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState()); - assertEquals("12345", context.getOperatorState("concat", "", false).getState()); + assertEquals("12345", context.getOperatorState("concat", "", false).value()); assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter); byte[] serializedState = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1)); @@ -81,19 +81,19 @@ public void simpleStateTest() throws Exception { StreamMap restoredMap = createOperatorWithContext(out, new ModKey(2), serializedState); StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext(); - assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).getState()); + assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value()); assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState()); - assertEquals("12345", restoredContext.getOperatorState("concat", "", false).getState()); + assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value()); assertEquals((Integer) 5, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter); out.clear(); processInputs(restoredMap, Arrays.asList(7, 8)); assertEquals(Arrays.asList("7", "8"), out); - assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).getState()); + assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).value()); assertEquals(ImmutableMap.of(0, 3, 1, 4), restoredContext.getOperatorStates().get("groupCounter") .getPartitionedState()); - assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).getState()); + assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).value()); assertEquals((Integer) 7, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter); } @@ -176,12 +176,12 @@ public static class StatefulMapper extends RichMapFunction impl @Override public String map(Integer value) throws Exception { - counter.updateState(counter.getState() + 1); - groupCounter.updateState(groupCounter.getState() + 1); - concat.updateState(concat.getState() + value.toString()); + counter.update(counter.value() + 1); + groupCounter.update(groupCounter.value() + 1); + concat.update(concat.value() + value.toString()); checkpointedCounter++; try { - counter.updateState(null); + counter.update(null); fail(); } catch (RuntimeException e){ } @@ -235,7 +235,7 @@ public static class StatefulMapper2 extends RichMapFunction { @Override public String map(Integer value) throws Exception { - groupCounter.updateState(groupCounter.getState() + 1); + groupCounter.update(groupCounter.value() + 1); return value.toString(); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index e2430d6cfef78..a826effe0d5ec 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -198,7 +198,7 @@ private static class StringGeneratingSourceFunction extends RichSourceFunction ctx) throws Exception { final Object lockingObject = ctx.getCheckpointLock(); - while (isRunning && index.getState() < numElements) { - char first = (char) ((index.getState() % 40) + 40); + while (isRunning && index.value() < numElements) { + char first = (char) ((index.value() % 40) + 40); stringBuilder.setLength(0); stringBuilder.append(first); @@ -231,7 +231,7 @@ public void run(SourceContext ctx) throws Exception { String result = randomString(stringBuilder, rnd); synchronized (lockingObject) { - index.updateState(index.getState() + step); + index.update(index.value() + step); ctx.collect(result); } } @@ -261,7 +261,7 @@ private static class StatefulCounterFunction extends RichMapFunction sourceCtx) throws Exception { final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); boolean checkForProceedFile = true; - while (isRunning && collected.getState() < toCollect) { + while (isRunning && collected.value() < toCollect) { // check if the proceed file exists (then we go full speed) // if not, we always recheck and sleep if (checkForProceedFile) { @@ -146,8 +146,8 @@ public void run(SourceContext sourceCtx) throws Exception { } synchronized (checkpointLock) { - sourceCtx.collect(collected.getState() * stepSize + congruence); - collected.updateState(collected.getState() + 1); + sourceCtx.collect(collected.value() * stepSize + congruence); + collected.update(collected.value() + 1); } } }