Skip to content

Commit

Permalink
[FLINK-4856] Add MapState for keyed state
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaogang.sxg authored and aljoscha committed Feb 23, 2017
1 parent de2605e commit 30c9e2b
Show file tree
Hide file tree
Showing 34 changed files with 2,887 additions and 9 deletions.
8 changes: 7 additions & 1 deletion docs/dev/stream/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ added to the state. Contrary to `ReducingState`, the aggregate type may be diffe
of elements that are added to the state. The interface is the same as for `ListState` but elements
added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.

* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and
retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively.

All types of state also have a method `clear()` that clears the state for the currently
active key, i.e. the key of the input element.

Expand All @@ -136,7 +141,7 @@ To get a state handle, you have to create a `StateDescriptor`. This holds the na
that you can reference them), the type of the values that the state holds, and possibly
a user-specified function, such as a `ReduceFunction`. Depending on what type of state you
want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`,
a `ReducingStateDescriptor` or a `FoldingStateDescriptor`.
a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`.

State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*.
Please see [here]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) for
Expand All @@ -147,6 +152,7 @@ is available in a `RichFunction` has these methods for accessing state:
* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
* `ListState<T> getListState(ListStateDescriptor<T>)`
* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`

This is an example `FlatMapFunction` that shows how all of the parts fit together:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
Expand Down Expand Up @@ -50,7 +53,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
implements InternalKvState<N>, State {

/** Serializer for the namespace */
private final TypeSerializer<N> namespaceSerializer;
final TypeSerializer<N> namespaceSerializer;

/** The current namespace, which the next value methods will refer to */
private N currentNamespace;
Expand Down Expand Up @@ -215,4 +218,48 @@ private static void writeVariableIntBytes(
value >>>= 8;
} while (value != 0);
}

protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException {
int keyGroup = readKeyGroup(inputView);
K key = readKey(inputStream, inputView);
N namespace = readNamespace(inputStream, inputView);

return new Tuple3<>(keyGroup, key, namespace);
}

private int readKeyGroup(DataInputView inputView) throws IOException {
int keyGroup = 0;
for (int i = 0; i < backend.getKeyGroupPrefixBytes(); ++i) {
keyGroup <<= 8;
keyGroup |= (inputView.readByte() & 0xFF);
}
return keyGroup;
}

private K readKey(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException {
int beforeRead = inputStream.getPosition();
K key = backend.getKeySerializer().deserialize(inputView);
if (ambiguousKeyPossible) {
int length = inputStream.getPosition() - beforeRead;
readVariableIntBytes(inputView, length);
}
return key;
}

private N readNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException {
int beforeRead = inputStream.getPosition();
N namespace = namespaceSerializer.deserialize(inputView);
if (ambiguousKeyPossible) {
int length = inputStream.getPosition() - beforeRead;
readVariableIntBytes(inputView, length);
}
return namespace;
}

private void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
do {
inputView.readByte();
value >>>= 8;
} while (value != 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.util.SerializableObject;
Expand Down Expand Up @@ -882,6 +884,14 @@ protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}

@Override
protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception {
ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);

return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this);
}

/**
* Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator.
* Used by #MergeIterator.
Expand Down
Loading

0 comments on commit 30c9e2b

Please sign in to comment.