Skip to content

Commit

Permalink
[FLINK-2323] [api-breaking] Rename OperatorState interface methods to…
Browse files Browse the repository at this point in the history
… value() and update(..)

Closes apache#890
  • Loading branch information
gyfora committed Jul 7, 2015
1 parent 0e21941 commit 0d2c490
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 59 deletions.
6 changes: 3 additions & 3 deletions docs/apis/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ Flink supports the checkpointing and persistence of user defined operator states

Flink supports two types of operator states: partitioned and non-partitioned states.

In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper.
In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.value()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `value()` will return number of inputs processed by each parallel mapper.

In case of of partitioned operator state a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.

Expand All @@ -1244,7 +1244,7 @@ public class CounterSum implements RichReduceFunction<Long> {

@Override
public Long reduce(Long value1, Long value2) throws Exception {
counter.updateState(counter.getState() + 1);
counter.update(counter.value() + 1);
return value1 + value2;
}

Expand Down Expand Up @@ -1275,7 +1275,7 @@ public static class CounterSource implements RichParallelSourceFunction<Long> {
// output and state update are atomic
synchronized (lock){
ctx.collect(offset);
offset.updateState(offset.getState() + 1);
offset.update(offset.value() + 1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ public interface RuntimeContext {
* can be used by the same operator.
* @param defaultState
* Default value for the operator state. This will be returned
* the first time {@link OperatorState#getState()} (for every
* the first time {@link OperatorState#value()} (for every
* state partition) is called before
* {@link OperatorState#updateState(Object)}.
* {@link OperatorState#update(Object)}.
* @param partitioned
* Sets whether partitioning should be applied for the given
* state. If true a partitioner key must be used.
Expand Down Expand Up @@ -216,9 +216,9 @@ <S, C extends Serializable> OperatorState<S> getOperatorState(String name, S def
* can be used by the same operator.
* @param defaultState
* Default value for the operator state. This will be returned
* the first time {@link OperatorState#getState()} (for every
* the first time {@link OperatorState#value()} (for every
* state partition) is called before
* {@link OperatorState#updateState(Object)}.
* {@link OperatorState#update(Object)}.
* @param partitioned
* Sets whether partitioning should be applied for the given
* state. If true a partitioner key must be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
* partitioned (when state partitioning is defined in the program) or
* non-partitioned user states.
*
* State can be accessed and manipulated using the {@link #getState()} and
* {@link #updateState(T)} methods. These calls are only safe in the
* State can be accessed and manipulated using the {@link #value()} and
* {@link #update(T)} methods. These calls are only safe in the
* transformation call the operator represents, for instance inside
* {@link MapFunction#map()} and can lead tp unexpected behavior in the
* {@link #open(org.apache.flink.configuration.Configuration)} or
Expand All @@ -40,28 +40,28 @@
public interface OperatorState<T> {

/**
* Gets the current state for the operator. When the state is not
* partitioned the returned state is the same for all inputs in a given
* operator instance. If state partitioning is applied, the state returned
* Returns the current value for the state. When the state is not
* partitioned the returned value is the same for all inputs in a given
* operator instance. If state partitioning is applied, the value returned
* depends on the current operator input, as the operator maintains an
* independent state for each partition.
*
* @return The operator state corresponding to the current input.
* @return The operator state value corresponding to the current input.
*
* @throws IOException Thrown if the system cannot access the state.
*/
T getState() throws IOException;
T value() throws IOException;

/**
* Updates the operator state accessible by {@link #getState()} to the given
* value. The next time {@link #getState()} is called (for the same state
* Updates the operator state accessible by {@link #value()} to the given
* value. The next time {@link #value()} is called (for the same state
* partition) the returned state will represent the updated value.
*
* @param state
* The new state.
* The new value for the state.
*
* @throws IOException Thrown if the system cannot access the state.
*/
void updateState(T state) throws IOException;
void update(T value) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,17 @@ public void open(Configuration parameters) throws Exception {
this.lastOffsets = getRuntimeContext().getOperatorState("offset", new long[numPartitions], false);
this.commitedOffsets = new long[numPartitions];
// check if there are offsets to restore
if (!Arrays.equals(lastOffsets.getState(), new long[numPartitions])) {
if (lastOffsets.getState().length != numPartitions) {
throw new IllegalStateException("There are "+lastOffsets.getState().length+" offsets to restore for topic "+topicName+" but " +
if (!Arrays.equals(lastOffsets.value(), new long[numPartitions])) {
if (lastOffsets.value().length != numPartitions) {
throw new IllegalStateException("There are "+lastOffsets.value().length+" offsets to restore for topic "+topicName+" but " +
"there are only "+numPartitions+" in the topic");
}

LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.getState()));
setOffsetsInZooKeeper(lastOffsets.getState());
LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.value()));
setOffsetsInZooKeeper(lastOffsets.value());
} else {
// initialize empty offsets
Arrays.fill(this.lastOffsets.getState(), -1);
Arrays.fill(this.lastOffsets.value(), -1);
}
Arrays.fill(this.commitedOffsets, 0); // just to make it clear

Expand All @@ -175,7 +175,7 @@ public void run(SourceContext<OUT> ctx) throws Exception {

while (running && iteratorToRead.hasNext()) {
MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
if(lastOffsets.getState()[message.partition()] >= message.offset()) {
if(lastOffsets.value()[message.partition()] >= message.offset()) {
LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
continue;
}
Expand All @@ -188,7 +188,7 @@ public void run(SourceContext<OUT> ctx) throws Exception {

// make the state update and the element emission atomic
synchronized (checkpointLock) {
lastOffsets.getState()[message.partition()] = message.offset();
lastOffsets.value()[message.partition()] = message.offset();
ctx.collect(next);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ public void run(SourceContext<Long> ctx) throws Exception {
((end - start + 1) / stepSize + 1) :
((end - start + 1) / stepSize);

Long currentCollected = collected.getState();
Long currentCollected = collected.value();

while (isRunning && currentCollected < toCollect) {
synchronized (checkpointLock) {
ctx.collect(currentCollected * stepSize + congruence);
collected.updateState(currentCollected + 1);
collected.update(currentCollected + 1);
}
currentCollected = collected.getState();
currentCollected = collected.value();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public PartitionedStreamOperatorState(StateHandleProvider<C> provider,
}

@Override
public S getState() throws IOException{
public S value() throws IOException{
if (currentInput == null) {
throw new IllegalStateException("Need a valid input for accessing the state.");
} else {
Expand All @@ -87,7 +87,7 @@ public S getState() throws IOException{
}

@Override
public void updateState(S state) throws IOException {
public void update(S state) throws IOException {
if (state == null) {
throw new RuntimeException("Cannot set state to null.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,21 @@ public StreamOperatorState(StateHandleProvider<C> provider) {
}

@Override
public S getState() throws IOException {
public S value() throws IOException {
return state;
}

@Override
public void updateState(S state) throws IOException {
public void update(S state) throws IOException {
if (state == null) {
throw new RuntimeException("Cannot set state to null.");
}
this.state = state;
}

public void setDefaultState(S defaultState) throws IOException {
if (getState() == null) {
updateState(defaultState);
if (value() == null) {
update(defaultState);
}
}

Expand All @@ -92,12 +92,12 @@ protected StateHandleProvider<C> getStateHandleProvider() {
public Map<Serializable, StateHandle<C>> snapshotState(long checkpointId,
long checkpointTimestamp) throws Exception {
return ImmutableMap.of(DEFAULTKEY, provider.createStateHandle(checkpointer.snapshotState(
getState(), checkpointId, checkpointTimestamp)));
value(), checkpointId, checkpointTimestamp)));

}

public void restoreState(Map<Serializable, StateHandle<C>> snapshots) throws Exception {
updateState(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState()));
update(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState()));
}

public Map<Serializable, S> getPartitionedState() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,29 @@ public void simpleStateTest() throws Exception {
processInputs(map, Arrays.asList(1, 2, 3, 4, 5));

assertEquals(Arrays.asList("1", "2", "3", "4", "5"), out);
assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).getState());
assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).value());
assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
assertEquals("12345", context.getOperatorState("concat", "", false).getState());
assertEquals("12345", context.getOperatorState("concat", "", false).value());
assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter);

byte[] serializedState = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));

StreamMap<Integer, String> restoredMap = createOperatorWithContext(out, new ModKey(2), serializedState);
StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext();

assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).getState());
assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value());
assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
assertEquals("12345", restoredContext.getOperatorState("concat", "", false).getState());
assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value());
assertEquals((Integer) 5, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
out.clear();

processInputs(restoredMap, Arrays.asList(7, 8));

assertEquals(Arrays.asList("7", "8"), out);
assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).getState());
assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).value());
assertEquals(ImmutableMap.of(0, 3, 1, 4), restoredContext.getOperatorStates().get("groupCounter")
.getPartitionedState());
assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).getState());
assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).value());
assertEquals((Integer) 7, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);

}
Expand Down Expand Up @@ -176,12 +176,12 @@ public static class StatefulMapper extends RichMapFunction<Integer, String> impl

@Override
public String map(Integer value) throws Exception {
counter.updateState(counter.getState() + 1);
groupCounter.updateState(groupCounter.getState() + 1);
concat.updateState(concat.getState() + value.toString());
counter.update(counter.value() + 1);
groupCounter.update(groupCounter.value() + 1);
concat.update(concat.value() + value.toString());
checkpointedCounter++;
try {
counter.updateState(null);
counter.update(null);
fail();
} catch (RuntimeException e){
}
Expand Down Expand Up @@ -235,7 +235,7 @@ public static class StatefulMapper2 extends RichMapFunction<Integer, String> {

@Override
public String map(Integer value) throws Exception {
groupCounter.updateState(groupCounter.getState() + 1);
groupCounter.update(groupCounter.value() + 1);

return value.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private static class StringGeneratingSourceFunction extends RichSourceFunction<S
static final long[] counts = new long[PARALLELISM];
@Override
public void close() throws IOException {
counts[getRuntimeContext().getIndexOfThisSubtask()] = index.getState();
counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value();
}


Expand All @@ -222,16 +222,16 @@ public void open(Configuration parameters) throws IOException {
public void run(SourceContext<String> ctx) throws Exception {
final Object lockingObject = ctx.getCheckpointLock();

while (isRunning && index.getState() < numElements) {
char first = (char) ((index.getState() % 40) + 40);
while (isRunning && index.value() < numElements) {
char first = (char) ((index.value() % 40) + 40);

stringBuilder.setLength(0);
stringBuilder.append(first);

String result = randomString(stringBuilder, rnd);

synchronized (lockingObject) {
index.updateState(index.getState() + step);
index.update(index.value() + step);
ctx.collect(result);
}
}
Expand Down Expand Up @@ -261,7 +261,7 @@ private static class StatefulCounterFunction extends RichMapFunction<PrefixCount

@Override
public PrefixCount map(PrefixCount value) throws Exception {
count.updateState(count.getState() + 1);
count.update(count.value() + 1);
return value;
}

Expand All @@ -272,7 +272,7 @@ public void open(Configuration conf) throws IOException {

@Override
public void close() throws IOException {
counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
}

}
Expand Down Expand Up @@ -370,7 +370,7 @@ private static class StringPrefixCountRichMapFunction extends RichMapFunction<St

@Override
public PrefixCount map(String value) throws IOException {
count.updateState(count.getState() + 1);
count.update(count.value() + 1);
return new PrefixCount(value.substring(0, 1), value, 1L);
}

Expand All @@ -381,7 +381,7 @@ public void open(Configuration conf) throws IOException {

@Override
public void close() throws IOException {
counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void run(SourceContext<Long> sourceCtx) throws Exception {
final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
boolean checkForProceedFile = true;

while (isRunning && collected.getState() < toCollect) {
while (isRunning && collected.value() < toCollect) {
// check if the proceed file exists (then we go full speed)
// if not, we always recheck and sleep
if (checkForProceedFile) {
Expand All @@ -146,8 +146,8 @@ public void run(SourceContext<Long> sourceCtx) throws Exception {
}

synchronized (checkpointLock) {
sourceCtx.collect(collected.getState() * stepSize + congruence);
collected.updateState(collected.getState() + 1);
sourceCtx.collect(collected.value() * stepSize + congruence);
collected.update(collected.value() + 1);
}
}
}
Expand Down

0 comments on commit 0d2c490

Please sign in to comment.