diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index ad6b7c22ec4c6..5f9da7dd810d0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -83,11 +83,11 @@ class RocksDBMapState * @param backend The backend for which this state is bind to. */ private RocksDBMapState( - ColumnFamilyHandle columnFamily, - TypeSerializer namespaceSerializer, - TypeSerializer> valueSerializer, - Map defaultValue, - RocksDBKeyedStateBackend backend) { + ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + TypeSerializer> valueSerializer, + Map defaultValue, + RocksDBKeyedStateBackend backend) { super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend); @@ -122,14 +122,14 @@ public UV get(UK userKey) throws IOException, RocksDBException { byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); - return (rawValueBytes == null ? null : deserializeUserValue(rawValueBytes)); + return (rawValueBytes == null ? null : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer)); } @Override public void put(UK userKey, UV userValue) throws IOException, RocksDBException { byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); - byte[] rawValueBytes = serializeUserValue(userValue); + byte[] rawValueBytes = serializeUserValue(userValue, userValueSerializer, dataOutputView); backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); } @@ -143,7 +143,7 @@ public void putAll(Map map) throws IOException, RocksDBException { try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, writeOptions)) { for (Map.Entry entry : map.entrySet()) { byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey()); - byte[] rawValueBytes = serializeUserValue(entry.getValue()); + byte[] rawValueBytes = serializeUserValue(entry.getValue(), userValueSerializer, dataOutputView); writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes); } } @@ -180,7 +180,7 @@ public Iterable> entries() throws IOException { public Iterable keys() throws IOException { final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); - return () -> new RocksDBMapIterator(backend.db, prefixBytes, userKeySerializer, userValueSerializer) { + return () -> new RocksDBMapIterator(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) { @Override public UK next() { RocksDBMapEntry entry = nextEntry(); @@ -193,7 +193,7 @@ public UK next() { public Iterable values() throws IOException { final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); - return () -> new RocksDBMapIterator(backend.db, prefixBytes, userKeySerializer, userValueSerializer) { + return () -> new RocksDBMapIterator(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) { @Override public UV next() { RocksDBMapEntry entry = nextEntry(); @@ -206,7 +206,7 @@ public UV next() { public Iterator> iterator() throws IOException { final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); - return new RocksDBMapIterator>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) { + return new RocksDBMapIterator>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) { @Override public Map.Entry next() { return nextEntry(); @@ -218,7 +218,7 @@ public Map.Entry next() { public void clear() { try { try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily); - WriteBatch writeBatch = new WriteBatch(128)) { + WriteBatch writeBatch = new WriteBatch(128)) { final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace(); iterator.seek(keyPrefixBytes); @@ -243,10 +243,10 @@ public void clear() { @Override @SuppressWarnings("unchecked") public byte[] getSerializedValue( - final byte[] serializedKeyAndNamespace, - final TypeSerializer safeKeySerializer, - final TypeSerializer safeNamespaceSerializer, - final TypeSerializer> safeValueSerializer) throws Exception { + final byte[] serializedKeyAndNamespace, + final TypeSerializer safeKeySerializer, + final TypeSerializer safeNamespaceSerializer, + final TypeSerializer> safeValueSerializer) throws Exception { Preconditions.checkNotNull(serializedKeyAndNamespace); Preconditions.checkNotNull(safeKeySerializer); @@ -255,19 +255,20 @@ public byte[] getSerializedValue( //TODO make KvStateSerializer key-group aware to save this round trip and key-group computation Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( - serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer); + serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups()); ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(128); + ByteArrayDataInputView inputView = new ByteArrayDataInputView(); writeKeyWithGroupAndNamespace( - keyGroup, - keyAndNamespace.f0, - safeKeySerializer, - keyAndNamespace.f1, - safeNamespaceSerializer, - outputView); + keyGroup, + keyAndNamespace.f0, + safeKeySerializer, + keyAndNamespace.f1, + safeNamespaceSerializer, + outputView); final byte[] keyPrefixBytes = outputView.toByteArray(); @@ -277,10 +278,11 @@ public byte[] getSerializedValue( final TypeSerializer dupUserValueSerializer = serializer.getValueSerializer(); final Iterator> iterator = new RocksDBMapIterator>( - backend.db, - keyPrefixBytes, - dupUserKeySerializer, - dupUserValueSerializer) { + backend.db, + keyPrefixBytes, + dupUserKeySerializer, + dupUserValueSerializer, + inputView) { @Override public Map.Entry next() { @@ -313,15 +315,11 @@ private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOE return dataOutputView.toByteArray(); } - private byte[] serializeUserValue(UV userValue) throws IOException { - return serializeUserValue(userValue, userValueSerializer); - } + private static byte[] serializeUserValue( + UV userValue, + TypeSerializer valueSerializer, + ByteArrayDataOutputView dataOutputView) throws IOException { - private UV deserializeUserValue(byte[] rawValueBytes) throws IOException { - return deserializeUserValue(rawValueBytes, userValueSerializer); - } - - private byte[] serializeUserValue(UV userValue, TypeSerializer valueSerializer) throws IOException { dataOutputView.reset(); if (userValue == null) { @@ -334,12 +332,20 @@ private byte[] serializeUserValue(UV userValue, TypeSerializer valueSerializ return dataOutputView.toByteArray(); } - private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer keySerializer) throws IOException { + private static UK deserializeUserKey( + ByteArrayDataInputView dataInputView, + int userKeyOffset, + byte[] rawKeyBytes, + TypeSerializer keySerializer) throws IOException { dataInputView.setData(rawKeyBytes, userKeyOffset, rawKeyBytes.length - userKeyOffset); return keySerializer.deserialize(dataInputView); } - private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer valueSerializer) throws IOException { + private static UV deserializeUserValue( + ByteArrayDataInputView dataInputView, + byte[] rawValueBytes, + TypeSerializer valueSerializer) throws IOException { + dataInputView.setData(rawValueBytes); boolean isNull = dataInputView.readBoolean(); @@ -388,17 +394,22 @@ private class RocksDBMapEntry implements Map.Entry { /** The offset of User Key offset in raw key bytes. */ private final int userKeyOffset; - private TypeSerializer keySerializer; + private final TypeSerializer keySerializer; + + private final TypeSerializer valueSerializer; - private TypeSerializer valueSerializer; + private final ByteArrayDataInputView dataInputView; + private final ByteArrayDataOutputView dataOutputView; RocksDBMapEntry( - @Nonnull final RocksDB db, - @Nonnegative final int userKeyOffset, - @Nonnull final byte[] rawKeyBytes, - @Nonnull final byte[] rawValueBytes, - @Nonnull final TypeSerializer keySerializer, - @Nonnull final TypeSerializer valueSerializer) { + @Nonnull final RocksDB db, + @Nonnegative final int userKeyOffset, + @Nonnull final byte[] rawKeyBytes, + @Nonnull final byte[] rawValueBytes, + @Nonnull final TypeSerializer keySerializer, + @Nonnull final TypeSerializer valueSerializer, + @Nonnull ByteArrayDataInputView dataInputView, + @Nonnull ByteArrayDataOutputView dataOutputView) { this.db = db; this.userKeyOffset = userKeyOffset; @@ -408,6 +419,8 @@ private class RocksDBMapEntry implements Map.Entry { this.rawKeyBytes = rawKeyBytes; this.rawValueBytes = rawValueBytes; this.deleted = false; + this.dataInputView = dataInputView; + this.dataOutputView = dataOutputView; } public void remove() { @@ -425,7 +438,7 @@ public void remove() { public UK getKey() { if (userKey == null) { try { - userKey = deserializeUserKey(userKeyOffset, rawKeyBytes, keySerializer); + userKey = deserializeUserKey(dataInputView, userKeyOffset, rawKeyBytes, keySerializer); } catch (IOException e) { throw new FlinkRuntimeException("Error while deserializing the user key.", e); } @@ -441,7 +454,7 @@ public UV getValue() { } else { if (userValue == null) { try { - userValue = deserializeUserValue(rawValueBytes, valueSerializer); + userValue = deserializeUserValue(dataInputView, rawValueBytes, valueSerializer); } catch (IOException e) { throw new FlinkRuntimeException("Error while deserializing the user value.", e); } @@ -461,7 +474,7 @@ public UV setValue(UV value) { try { userValue = value; - rawValueBytes = serializeUserValue(value, valueSerializer); + rawValueBytes = serializeUserValue(value, valueSerializer, dataOutputView); db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); } catch (IOException | RocksDBException e) { @@ -499,17 +512,20 @@ private abstract class RocksDBMapIterator implements Iterator { private final TypeSerializer keySerializer; private final TypeSerializer valueSerializer; + private final ByteArrayDataInputView dataInputView; RocksDBMapIterator( - final RocksDB db, - final byte[] keyPrefixBytes, - final TypeSerializer keySerializer, - final TypeSerializer valueSerializer) { + final RocksDB db, + final byte[] keyPrefixBytes, + final TypeSerializer keySerializer, + final TypeSerializer valueSerializer, + ByteArrayDataInputView dataInputView) { this.db = db; this.keyPrefixBytes = keyPrefixBytes; this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; + this.dataInputView = dataInputView; } @Override @@ -564,7 +580,7 @@ private void loadCache() { * The iteration starts from the prefix bytes at the first loading. The cache then is * reloaded when the next entry to return is the last one in the cache. At that time, * we will start the iterating from the last returned entry. - */ + */ RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1); byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes); @@ -597,7 +613,9 @@ private void loadCache() { iterator.key(), iterator.value(), keySerializer, - valueSerializer); + valueSerializer, + dataInputView, + dataOutputView); cacheEntries.add(entry);