From 3fd6587208a16940eab327ed37fa4e23d0b7eb62 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Wed, 22 Aug 2018 13:31:49 +0200 Subject: [PATCH] [FLINK-10176][state] Replace ByteArrayData[Input|Output]View with (enhanced) Data[Output|InputDe]Serializer This closes #6583. --- ...nformationKeyValueSerializationSchema.java | 4 +- .../TypeInformationSerializationSchema.java | 4 +- .../core/memory/ByteArrayDataInputView.java | 60 ---------- .../core/memory/ByteArrayDataOutputView.java | 61 ---------- .../core/memory/DataInputDeserializer.java | 54 +++++---- .../ttl/TtlStateSnapshotTransformer.java | 8 +- .../state/AbstractRocksDBAppendingState.java | 2 +- .../streaming/state/AbstractRocksDBState.java | 30 ++--- .../state/RocksDBAggregatingState.java | 12 +- .../state/RocksDBCachingPriorityQueueSet.java | 23 ++-- .../state/RocksDBKeySerializationUtils.java | 20 ++-- .../state/RocksDBKeyedStateBackend.java | 16 +-- .../streaming/state/RocksDBListState.java | 35 +++--- .../streaming/state/RocksDBMapState.java | 112 +++++++++--------- .../streaming/state/RocksDBReducingState.java | 12 +- .../streaming/state/RocksDBValueState.java | 10 +- .../iterator/RocksStateKeysIterator.java | 10 +- ...onedPriorityQueueWithRocksDBStoreTest.java | 8 +- ...RocksDBIncrementalCheckpointUtilsTest.java | 16 +-- .../RocksDBKeySerializationUtilsTest.java | 24 ++-- .../RocksDBRocksStateKeysIteratorTest.java | 6 +- .../source/SerializedCheckpointData.java | 2 +- 22 files changed, 211 insertions(+), 318 deletions(-) delete mode 100644 flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java delete mode 100644 flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataOutputView.java diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java index 3be5779ceec34..cc4a54bfeef98 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java @@ -101,11 +101,11 @@ public Tuple2 deserialize(byte[] messageKey, byte[] message, String topic, V value = null; if (messageKey != null) { - inputDeserializer.setBuffer(messageKey, 0, messageKey.length); + inputDeserializer.setBuffer(messageKey); key = keySerializer.deserialize(inputDeserializer); } if (message != null) { - inputDeserializer.setBuffer(message, 0, message.length); + inputDeserializer.setBuffer(message); value = valueSerializer.deserialize(inputDeserializer); } return new Tuple2<>(key, value); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java index 78da3fadc3e67..6be265a515200 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java @@ -81,9 +81,9 @@ public TypeInformationSerializationSchema(TypeInformation typeInfo, TypeSeria @Override public T deserialize(byte[] message) { if (dis != null) { - dis.setBuffer(message, 0, message.length); + dis.setBuffer(message); } else { - dis = new DataInputDeserializer(message, 0, message.length); + dis = new DataInputDeserializer(message); } try { diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java deleted file mode 100644 index 698a9f97dc05d..0000000000000 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.memory; - -import javax.annotation.Nonnull; - -/** - * Reusable adapter to {@link DataInputView} that operates on given byte-arrays. - */ -public class ByteArrayDataInputView extends DataInputViewStreamWrapper { - - @Nonnull - private final ByteArrayInputStreamWithPos inStreamWithPos; - - public ByteArrayDataInputView() { - super(new ByteArrayInputStreamWithPos()); - this.inStreamWithPos = (ByteArrayInputStreamWithPos) in; - } - - public ByteArrayDataInputView(@Nonnull byte[] buffer) { - this(buffer, 0, buffer.length); - } - - public ByteArrayDataInputView(@Nonnull byte[] buffer, int offset, int length) { - this(); - setData(buffer, offset, length); - } - - public int getPosition() { - return inStreamWithPos.getPosition(); - } - - public void setPosition(int pos) { - inStreamWithPos.setPosition(pos); - } - - public void setData(@Nonnull byte[] buffer, int offset, int length) { - inStreamWithPos.setBuffer(buffer, offset, length); - } - - public void setData(@Nonnull byte[] buffer) { - setData(buffer, 0, buffer.length); - } -} diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataOutputView.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataOutputView.java deleted file mode 100644 index a96f3d3fef195..0000000000000 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataOutputView.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.memory; - -import javax.annotation.Nonnull; - -/** - * Adapter to {@link DataOutputView} that operates on a byte-array and offers read/write access to the current position. - */ -public class ByteArrayDataOutputView extends DataOutputViewStreamWrapper { - - @Nonnull - private final ByteArrayOutputStreamWithPos outputStreamWithPos; - - public ByteArrayDataOutputView() { - this(64); - } - - public ByteArrayDataOutputView(int initialSize) { - super(new ByteArrayOutputStreamWithPos(initialSize)); - this.outputStreamWithPos = (ByteArrayOutputStreamWithPos) out; - } - - public void reset() { - outputStreamWithPos.reset(); - } - - @Nonnull - public byte[] toByteArray() { - return outputStreamWithPos.toByteArray(); - } - - public int getPosition() { - return outputStreamWithPos.getPosition(); - } - - public void setPosition(int position) { - outputStreamWithPos.setPosition(position); - } - - @Nonnull - public byte[] getInternalBufferReference() { - return outputStreamWithPos.getBuf(); - } -} diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java index 11973e836c916..ffdd828e77d3b 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java @@ -18,6 +18,9 @@ package org.apache.flink.core.memory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.io.EOFException; import java.io.IOException; import java.io.UTFDataFormatException; @@ -29,6 +32,7 @@ */ public class DataInputDeserializer implements DataInputView, java.io.Serializable { + private static final byte[] EMPTY = new byte[0]; private static final long serialVersionUID = 1L; // ------------------------------------------------------------------------ @@ -41,17 +45,19 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl // ------------------------------------------------------------------------ - public DataInputDeserializer() {} + public DataInputDeserializer() { + setBuffer(EMPTY); + } - public DataInputDeserializer(byte[] buffer) { - setBuffer(buffer, 0, buffer.length); + public DataInputDeserializer(@Nonnull byte[] buffer) { + setBufferInternal(buffer, 0, buffer.length); } - public DataInputDeserializer(byte[] buffer, int start, int len) { + public DataInputDeserializer(@Nonnull byte[] buffer, int start, int len) { setBuffer(buffer, start, len); } - public DataInputDeserializer(ByteBuffer buffer) { + public DataInputDeserializer(@Nonnull ByteBuffer buffer) { setBuffer(buffer); } @@ -59,7 +65,7 @@ public DataInputDeserializer(ByteBuffer buffer) { // Changing buffers // ------------------------------------------------------------------------ - public void setBuffer(ByteBuffer buffer) { + public void setBuffer(@Nonnull ByteBuffer buffer) { if (buffer.hasArray()) { this.buffer = buffer.array(); this.position = buffer.arrayOffset() + buffer.position(); @@ -76,15 +82,20 @@ public void setBuffer(ByteBuffer buffer) { } } - public void setBuffer(byte[] buffer, int start, int len) { - if (buffer == null) { - throw new NullPointerException(); - } + public void setBuffer(@Nonnull byte[] buffer, int start, int len) { if (start < 0 || len < 0 || start + len > buffer.length) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Invalid bounds."); } + setBufferInternal(buffer, start, len); + } + + public void setBuffer(@Nonnull byte[] buffer) { + setBufferInternal(buffer, 0, buffer.length); + } + + private void setBufferInternal(@Nonnull byte[] buffer, int start, int len) { this.buffer = buffer; this.position = start; this.end = start + len; @@ -144,12 +155,12 @@ public float readFloat() throws IOException { } @Override - public void readFully(byte[] b) throws IOException { + public void readFully(@Nonnull byte[] b) throws IOException { readFully(b, 0, b.length); } @Override - public void readFully(byte[] b, int off, int len) throws IOException { + public void readFully(@Nonnull byte[] b, int off, int len) throws IOException { if (len >= 0) { if (off <= b.length - len) { if (this.position <= this.end - len) { @@ -161,7 +172,7 @@ public void readFully(byte[] b, int off, int len) throws IOException { } else { throw new ArrayIndexOutOfBoundsException(); } - } else if (len < 0) { + } else { throw new IllegalArgumentException("Length may not be negative."); } } @@ -182,6 +193,7 @@ public int readInt() throws IOException { } } + @Nullable @Override public String readLine() throws IOException { if (this.position < this.end) { @@ -229,6 +241,7 @@ public short readShort() throws IOException { } } + @Nonnull @Override public String readUTF() throws IOException { int utflen = readUnsignedShort(); @@ -319,7 +332,7 @@ public int readUnsignedShort() throws IOException { } @Override - public int skipBytes(int n) throws IOException { + public int skipBytes(int n) { if (this.position <= this.end - n) { this.position += n; return n; @@ -340,10 +353,7 @@ public void skipBytesToRead(int numBytes) throws IOException { } @Override - public int read(byte[] b, int off, int len) throws IOException { - if (b == null){ - throw new NullPointerException("Byte array b cannot be null."); - } + public int read(@Nonnull byte[] b, int off, int len) throws IOException { if (off < 0){ throw new IndexOutOfBoundsException("Offset cannot be negative."); @@ -370,10 +380,14 @@ public int read(byte[] b, int off, int len) throws IOException { } @Override - public int read(byte[] b) throws IOException { + public int read(@Nonnull byte[] b) throws IOException { return read(b, 0, b.length); } + public int getPosition() { + return position; + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java index 1ee05cd0392f8..e3706ec294471 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.state.ttl; import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.core.memory.ByteArrayDataInputView; +import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer; import org.apache.flink.util.FlinkRuntimeException; @@ -34,12 +34,12 @@ abstract class TtlStateSnapshotTransformer implements CollectionStateSnapshotTransformer { private final TtlTimeProvider ttlTimeProvider; final long ttl; - private final ByteArrayDataInputView div; + private final DataInputDeserializer div; TtlStateSnapshotTransformer(@Nonnull TtlTimeProvider ttlTimeProvider, long ttl) { this.ttlTimeProvider = ttlTimeProvider; this.ttl = ttl; - this.div = new ByteArrayDataInputView(); + this.div = new DataInputDeserializer(); } TtlValue filterTtlValue(TtlValue value) { @@ -55,7 +55,7 @@ boolean expired(long ts) { } long deserializeTs(byte[] value) throws IOException { - div.setData(value, 0, Long.BYTES); + div.setBuffer(value, 0, Long.BYTES); return LongSerializer.INSTANCE.deserialize(div); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java index 2a9ab7589a9ae..8c0f4d7da7b22 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java @@ -61,7 +61,7 @@ SV getInternal(byte[] key) { if (valueBytes == null) { return null; } - dataInputView.setData(valueBytes); + dataInputView.setBuffer(valueBytes); return valueSerializer.deserialize(dataInputView); } catch (IOException | RocksDBException e) { throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 65b7f1fa4a788..8b8fbb23a9958 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -20,8 +20,8 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.ByteArrayDataInputView; -import org.apache.flink.core.memory.ByteArrayDataOutputView; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.internal.InternalKvState; @@ -66,9 +66,9 @@ public abstract class AbstractRocksDBState implements protected final WriteOptions writeOptions; - protected final ByteArrayDataOutputView dataOutputView; + protected final DataOutputSerializer dataOutputView; - protected final ByteArrayDataInputView dataInputView; + protected final DataInputDeserializer dataInputView; private final boolean ambiguousKeyPossible; @@ -97,8 +97,8 @@ protected AbstractRocksDBState( this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "State value serializer"); this.defaultValue = defaultValue; - this.dataOutputView = new ByteArrayDataOutputView(128); - this.dataInputView = new ByteArrayDataInputView(); + this.dataOutputView = new DataOutputSerializer(128); + this.dataInputView = new DataInputDeserializer(); this.ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer); } @@ -109,7 +109,7 @@ protected AbstractRocksDBState( public void clear() { try { writeCurrentKeyWithGroupAndNamespace(); - byte[] key = dataOutputView.toByteArray(); + byte[] key = dataOutputView.getCopyOfBuffer(); backend.db.delete(columnFamily, writeOptions, key); } catch (IOException | RocksDBException e) { throw new FlinkRuntimeException("Error while removing entry from RocksDB", e); @@ -141,7 +141,7 @@ public byte[] getSerializedValue( // we cannot reuse the keySerializationStream member since this method // is called concurrently to the other ones and it may thus contain garbage - ByteArrayDataOutputView tmpKeySerializationView = new ByteArrayDataOutputView(128); + DataOutputSerializer tmpKeySerializationView = new DataOutputSerializer(128); writeKeyWithGroupAndNamespace( keyGroup, @@ -151,13 +151,13 @@ public byte[] getSerializedValue( safeNamespaceSerializer, tmpKeySerializationView); - return backend.db.get(columnFamily, tmpKeySerializationView.toByteArray()); + return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer()); } byte[] getKeyBytes() { try { writeCurrentKeyWithGroupAndNamespace(); - return dataOutputView.toByteArray(); + return dataOutputView.getCopyOfBuffer(); } catch (IOException e) { throw new FlinkRuntimeException("Error while serializing key", e); } @@ -165,9 +165,9 @@ byte[] getKeyBytes() { byte[] getValueBytes(V value) { try { - dataOutputView.reset(); + dataOutputView.clear(); valueSerializer.serialize(value, dataOutputView); - return dataOutputView.toByteArray(); + return dataOutputView.getCopyOfBuffer(); } catch (IOException e) { throw new FlinkRuntimeException("Error while serializing value", e); } @@ -183,7 +183,7 @@ protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { protected void writeKeyWithGroupAndNamespace( int keyGroup, K key, N namespace, - ByteArrayDataOutputView keySerializationDataOutputView) throws IOException { + DataOutputSerializer keySerializationDataOutputView) throws IOException { writeKeyWithGroupAndNamespace( keyGroup, @@ -200,13 +200,13 @@ protected void writeKeyWithGroupAndNamespace( final TypeSerializer keySerializer, final N namespace, final TypeSerializer namespaceSerializer, - final ByteArrayDataOutputView keySerializationDataOutputView) throws IOException { + final DataOutputSerializer keySerializationDataOutputView) throws IOException { Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context."); Preconditions.checkNotNull(keySerializer); Preconditions.checkNotNull(namespaceSerializer); - keySerializationDataOutputView.reset(); + keySerializationDataOutputView.clear(); RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView); RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationDataOutputView, ambiguousKeyPossible); RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java index 4f9ef2f811c8f..2085fb86256a3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java @@ -121,12 +121,12 @@ public void mergeNamespaces(N target, Collection sources) { if (source != null) { writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView); - final byte[] sourceKey = dataOutputView.toByteArray(); + final byte[] sourceKey = dataOutputView.getCopyOfBuffer(); final byte[] valueBytes = backend.db.get(columnFamily, sourceKey); backend.db.delete(columnFamily, writeOptions, sourceKey); if (valueBytes != null) { - dataInputView.setData(valueBytes); + dataInputView.setBuffer(valueBytes); ACC value = valueSerializer.deserialize(dataInputView); if (current != null) { @@ -144,23 +144,23 @@ public void mergeNamespaces(N target, Collection sources) { // create the target full-binary-key writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView); - final byte[] targetKey = dataOutputView.toByteArray(); + final byte[] targetKey = dataOutputView.getCopyOfBuffer(); final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey); if (targetValueBytes != null) { // target also had a value, merge - dataInputView.setData(targetValueBytes); + dataInputView.setBuffer(targetValueBytes); ACC value = valueSerializer.deserialize(dataInputView); current = aggFunction.merge(current, value); } // serialize the resulting value - dataOutputView.reset(); + dataOutputView.clear(); valueSerializer.serialize(current, dataOutputView); // write the resulting value - backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.toByteArray()); + backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.getCopyOfBuffer()); } } catch (Exception e) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java index 68b5b5fdee35d..364185a2f3e12 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java @@ -19,8 +19,8 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.ByteArrayDataInputView; -import org.apache.flink.core.memory.ByteArrayDataOutputView; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.util.CloseableIterator; @@ -84,11 +84,11 @@ public class RocksDBCachingPriorityQueueSet /** Output view that helps to serialize elements. */ @Nonnull - private final ByteArrayDataOutputView outputView; + private final DataOutputSerializer outputView; /** Input view that helps to de-serialize elements. */ @Nonnull - private final ByteArrayDataInputView inputView; + private final DataInputDeserializer inputView; /** In memory cache that holds a head-subset of the elements stored in RocksDB. */ @Nonnull @@ -114,8 +114,8 @@ public class RocksDBCachingPriorityQueueSet @Nonnull RocksDB db, @Nonnull ColumnFamilyHandle columnFamilyHandle, @Nonnull TypeSerializer byteOrderProducingSerializer, - @Nonnull ByteArrayDataOutputView outputStream, - @Nonnull ByteArrayDataInputView inputStream, + @Nonnull DataOutputSerializer outputStream, + @Nonnull DataInputDeserializer inputStream, @Nonnull RocksDBWriteBatchWrapper batchWrapper, @Nonnull OrderedByteArraySetCache orderedByteArraySetCache) { this.db = db; @@ -357,7 +357,7 @@ private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) { @Nonnull private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) { - outputView.reset(); + outputView.clear(); try { RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView); @@ -365,16 +365,16 @@ private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) { throw new FlinkRuntimeException("Could not write key-group bytes.", e); } - return outputView.toByteArray(); + return outputView.getCopyOfBuffer(); } @Nonnull private byte[] serializeElement(@Nonnull E element) { try { - outputView.reset(); + outputView.clear(); outputView.write(groupPrefixBytes); byteOrderProducingSerializer.serialize(element, outputView); - return outputView.toByteArray(); + return outputView.getCopyOfBuffer(); } catch (IOException e) { throw new FlinkRuntimeException("Error while serializing the element.", e); } @@ -383,7 +383,8 @@ private byte[] serializeElement(@Nonnull E element) { @Nonnull private E deserializeElement(@Nonnull byte[] bytes) { try { - inputView.setData(bytes, groupPrefixBytes.length, bytes.length); + final int numPrefixBytes = groupPrefixBytes.length; + inputView.setBuffer(bytes, numPrefixBytes, bytes.length - numPrefixBytes); return byteOrderProducingSerializer.deserialize(inputView); } catch (IOException e) { throw new FlinkRuntimeException("Error while deserializing the element.", e); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java index 7c9e3f8c3f0ae..d8844bfece12a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java @@ -18,9 +18,9 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.ByteArrayDataInputView; -import org.apache.flink.core.memory.ByteArrayDataOutputView; +import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.DataOutputView; import java.io.IOException; @@ -41,7 +41,7 @@ static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws public static K readKey( TypeSerializer keySerializer, - ByteArrayDataInputView inputView, + DataInputDeserializer inputView, boolean ambiguousKeyPossible) throws IOException { int beforeRead = inputView.getPosition(); K key = keySerializer.deserialize(inputView); @@ -54,7 +54,7 @@ public static K readKey( public static N readNamespace( TypeSerializer namespaceSerializer, - ByteArrayDataInputView inputView, + DataInputDeserializer inputView, boolean ambiguousKeyPossible) throws IOException { int beforeRead = inputView.getPosition(); N namespace = namespaceSerializer.deserialize(inputView); @@ -68,10 +68,10 @@ public static N readNamespace( public static void writeNameSpace( N namespace, TypeSerializer namespaceSerializer, - ByteArrayDataOutputView keySerializationDataOutputView, + DataOutputSerializer keySerializationDataOutputView, boolean ambiguousKeyPossible) throws IOException { - int beforeWrite = keySerializationDataOutputView.getPosition(); + int beforeWrite = keySerializationDataOutputView.length(); namespaceSerializer.serialize(namespace, keySerializationDataOutputView); if (ambiguousKeyPossible) { @@ -96,10 +96,10 @@ public static void writeKeyGroup( public static void writeKey( K key, TypeSerializer keySerializer, - ByteArrayDataOutputView keySerializationDataOutputView, + DataOutputSerializer keySerializationDataOutputView, boolean ambiguousKeyPossible) throws IOException { //write key - int beforeWrite = keySerializationDataOutputView.getPosition(); + int beforeWrite = keySerializationDataOutputView.length(); keySerializer.serialize(key, keySerializationDataOutputView); if (ambiguousKeyPossible) { @@ -117,8 +117,8 @@ private static void readVariableIntBytes(DataInputView inputView, int value) thr private static void writeLengthFrom( int fromPosition, - ByteArrayDataOutputView keySerializationDateDataOutputView) throws IOException { - int length = keySerializationDateDataOutputView.getPosition() - fromPosition; + DataOutputSerializer keySerializationDateDataOutputView) throws IOException { + int length = keySerializationDateDataOutputView.length() - fromPosition; writeVariableIntBytes(length, keySerializationDateDataOutputView); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 60baaedae7514..42a1e26b8b55d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -43,10 +43,10 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.ByteArrayDataInputView; -import org.apache.flink.core.memory.ByteArrayDataOutputView; +import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -328,7 +328,7 @@ public Stream getKeys(String state, N namespace) { (RegisteredKeyValueStateBackendMetaInfo) columnInfo.f1; final TypeSerializer namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer(); - final ByteArrayDataOutputView namespaceOutputView = new ByteArrayDataOutputView(8); + final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8); boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); final byte[] nameSpaceBytes; try { @@ -337,7 +337,7 @@ public Stream getKeys(String state, N namespace) { namespaceSerializer, namespaceOutputView, ambiguousKeyPossible); - nameSpaceBytes = namespaceOutputView.toByteArray(); + nameSpaceBytes = namespaceOutputView.getCopyOfBuffer(); } catch (IOException ex) { throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); } @@ -1501,15 +1501,15 @@ class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { /** A shared buffer to serialize elements for the priority queue. */ @Nonnull - private final ByteArrayDataOutputView sharedElementOutView; + private final DataOutputSerializer sharedElementOutView; /** A shared buffer to de-serialize elements for the priority queue. */ @Nonnull - private final ByteArrayDataInputView sharedElementInView; + private final DataInputDeserializer sharedElementInView; RocksDBPriorityQueueSetFactory() { - this.sharedElementOutView = new ByteArrayDataOutputView(); - this.sharedElementInView = new ByteArrayDataInputView(); + this.sharedElementOutView = new DataOutputSerializer(128); + this.sharedElementInView = new DataInputDeserializer(); } @Nonnull diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index cdd7afb7d9a22..f70c6a57bad60 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -24,9 +24,8 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.ByteArrayDataInputView; -import org.apache.flink.core.memory.ByteArrayDataOutputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalListState; @@ -115,7 +114,7 @@ public Iterable get() { public List getInternal() { try { writeCurrentKeyWithGroupAndNamespace(); - byte[] key = dataOutputView.toByteArray(); + byte[] key = dataOutputView.getCopyOfBuffer(); byte[] valueBytes = backend.db.get(columnFamily, key); return deserializeList(valueBytes); } catch (IOException | RocksDBException e) { @@ -129,7 +128,7 @@ private List deserializeList( return null; } - dataInputView.setData(valueBytes); + dataInputView.setBuffer(valueBytes); List result = new ArrayList<>(); V next; @@ -139,7 +138,7 @@ private List deserializeList( return result; } - private static V deserializeNextElement(DataInputViewStreamWrapper in, TypeSerializer elementSerializer) { + private static V deserializeNextElement(DataInputDeserializer in, TypeSerializer elementSerializer) { try { if (in.available() > 0) { V element = elementSerializer.deserialize(in); @@ -160,10 +159,10 @@ public void add(V value) { try { writeCurrentKeyWithGroupAndNamespace(); - byte[] key = dataOutputView.toByteArray(); - dataOutputView.reset(); + byte[] key = dataOutputView.getCopyOfBuffer(); + dataOutputView.clear(); elementSerializer.serialize(value, dataOutputView); - backend.db.merge(columnFamily, writeOptions, key, dataOutputView.toByteArray()); + backend.db.merge(columnFamily, writeOptions, key, dataOutputView.getCopyOfBuffer()); } catch (Exception e) { throw new FlinkRuntimeException("Error while adding data to RocksDB", e); } @@ -182,14 +181,14 @@ public void mergeNamespaces(N target, Collection sources) { try { // create the target full-binary-key writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView); - final byte[] targetKey = dataOutputView.toByteArray(); + final byte[] targetKey = dataOutputView.getCopyOfBuffer(); // merge the sources to the target for (N source : sources) { if (source != null) { writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView); - byte[] sourceKey = dataOutputView.toByteArray(); + byte[] sourceKey = dataOutputView.getCopyOfBuffer(); byte[] valueBytes = backend.db.get(columnFamily, sourceKey); backend.db.delete(columnFamily, writeOptions, sourceKey); @@ -218,7 +217,7 @@ public void updateInternal(List values) { if (!values.isEmpty()) { try { writeCurrentKeyWithGroupAndNamespace(); - byte[] key = dataOutputView.toByteArray(); + byte[] key = dataOutputView.getCopyOfBuffer(); byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView); backend.db.put(columnFamily, writeOptions, key, premerge); } catch (IOException | RocksDBException e) { @@ -234,7 +233,7 @@ public void addAll(List values) { if (!values.isEmpty()) { try { writeCurrentKeyWithGroupAndNamespace(); - byte[] key = dataOutputView.toByteArray(); + byte[] key = dataOutputView.getCopyOfBuffer(); byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView); backend.db.merge(columnFamily, writeOptions, key, premerge); } catch (IOException | RocksDBException e) { @@ -246,9 +245,9 @@ public void addAll(List values) { private static byte[] getPreMergedValue( List values, TypeSerializer elementSerializer, - ByteArrayDataOutputView keySerializationStream) throws IOException { + DataOutputSerializer keySerializationStream) throws IOException { - keySerializationStream.reset(); + keySerializationStream.clear(); boolean first = true; for (V value : values) { Preconditions.checkNotNull(value, "You cannot add null to a ListState."); @@ -260,7 +259,7 @@ private static byte[] getPreMergedValue( elementSerializer.serialize(value, keySerializationStream); } - return keySerializationStream.toByteArray(); + return keySerializationStream.getCopyOfBuffer(); } @SuppressWarnings("unchecked") @@ -280,7 +279,7 @@ static IS create( static class StateSnapshotTransformerWrapper implements StateSnapshotTransformer { private final StateSnapshotTransformer elementTransformer; private final TypeSerializer elementSerializer; - private final ByteArrayDataOutputView out = new ByteArrayDataOutputView(128); + private final DataOutputSerializer out = new DataOutputSerializer(128); private final CollectionStateSnapshotTransformer.TransformStrategy transformStrategy; StateSnapshotTransformerWrapper(StateSnapshotTransformer elementTransformer, TypeSerializer elementSerializer) { @@ -298,7 +297,7 @@ public byte[] filterOrTransform(@Nullable byte[] value) { return null; } List result = new ArrayList<>(); - ByteArrayDataInputView in = new ByteArrayDataInputView(value); + DataInputDeserializer in = new DataInputDeserializer(value); T next; int prevPosition = 0; while ((next = deserializeNextElement(in, elementSerializer)) != null) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 5f9da7dd810d0..5c9f7f9f30c2c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -24,8 +24,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.ByteArrayDataInputView; -import org.apache.flink.core.memory.ByteArrayDataOutputView; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; @@ -83,11 +83,11 @@ class RocksDBMapState * @param backend The backend for which this state is bind to. */ private RocksDBMapState( - ColumnFamilyHandle columnFamily, - TypeSerializer namespaceSerializer, - TypeSerializer> valueSerializer, - Map defaultValue, - RocksDBKeyedStateBackend backend) { + ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + TypeSerializer> valueSerializer, + Map defaultValue, + RocksDBKeyedStateBackend backend) { super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend); @@ -218,7 +218,7 @@ public Map.Entry next() { public void clear() { try { try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily); - WriteBatch writeBatch = new WriteBatch(128)) { + WriteBatch writeBatch = new WriteBatch(128)) { final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace(); iterator.seek(keyPrefixBytes); @@ -243,10 +243,10 @@ public void clear() { @Override @SuppressWarnings("unchecked") public byte[] getSerializedValue( - final byte[] serializedKeyAndNamespace, - final TypeSerializer safeKeySerializer, - final TypeSerializer safeNamespaceSerializer, - final TypeSerializer> safeValueSerializer) throws Exception { + final byte[] serializedKeyAndNamespace, + final TypeSerializer safeKeySerializer, + final TypeSerializer safeNamespaceSerializer, + final TypeSerializer> safeValueSerializer) throws Exception { Preconditions.checkNotNull(serializedKeyAndNamespace); Preconditions.checkNotNull(safeKeySerializer); @@ -255,22 +255,22 @@ public byte[] getSerializedValue( //TODO make KvStateSerializer key-group aware to save this round trip and key-group computation Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( - serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer); + serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups()); - ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(128); - ByteArrayDataInputView inputView = new ByteArrayDataInputView(); + DataOutputSerializer outputView = new DataOutputSerializer(128); + DataInputDeserializer inputView = new DataInputDeserializer(); writeKeyWithGroupAndNamespace( - keyGroup, - keyAndNamespace.f0, - safeKeySerializer, - keyAndNamespace.f1, - safeNamespaceSerializer, - outputView); + keyGroup, + keyAndNamespace.f0, + safeKeySerializer, + keyAndNamespace.f1, + safeNamespaceSerializer, + outputView); - final byte[] keyPrefixBytes = outputView.toByteArray(); + final byte[] keyPrefixBytes = outputView.getCopyOfBuffer(); final MapSerializer serializer = (MapSerializer) safeValueSerializer; @@ -278,11 +278,11 @@ public byte[] getSerializedValue( final TypeSerializer dupUserValueSerializer = serializer.getValueSerializer(); final Iterator> iterator = new RocksDBMapIterator>( - backend.db, - keyPrefixBytes, - dupUserKeySerializer, - dupUserValueSerializer, - inputView) { + backend.db, + keyPrefixBytes, + dupUserKeySerializer, + dupUserValueSerializer, + inputView) { @Override public Map.Entry next() { @@ -305,22 +305,22 @@ public Map.Entry next() { private byte[] serializeCurrentKeyAndNamespace() throws IOException { writeCurrentKeyWithGroupAndNamespace(); - return dataOutputView.toByteArray(); + return dataOutputView.getCopyOfBuffer(); } private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException { serializeCurrentKeyAndNamespace(); userKeySerializer.serialize(userKey, dataOutputView); - return dataOutputView.toByteArray(); + return dataOutputView.getCopyOfBuffer(); } private static byte[] serializeUserValue( UV userValue, TypeSerializer valueSerializer, - ByteArrayDataOutputView dataOutputView) throws IOException { + DataOutputSerializer dataOutputView) throws IOException { - dataOutputView.reset(); + dataOutputView.clear(); if (userValue == null) { dataOutputView.writeBoolean(true); @@ -329,24 +329,24 @@ private static byte[] serializeUserValue( valueSerializer.serialize(userValue, dataOutputView); } - return dataOutputView.toByteArray(); + return dataOutputView.getCopyOfBuffer(); } private static UK deserializeUserKey( - ByteArrayDataInputView dataInputView, + DataInputDeserializer dataInputView, int userKeyOffset, byte[] rawKeyBytes, TypeSerializer keySerializer) throws IOException { - dataInputView.setData(rawKeyBytes, userKeyOffset, rawKeyBytes.length - userKeyOffset); + dataInputView.setBuffer(rawKeyBytes, userKeyOffset, rawKeyBytes.length - userKeyOffset); return keySerializer.deserialize(dataInputView); } private static UV deserializeUserValue( - ByteArrayDataInputView dataInputView, + DataInputDeserializer dataInputView, byte[] rawValueBytes, TypeSerializer valueSerializer) throws IOException { - dataInputView.setData(rawValueBytes); + dataInputView.setBuffer(rawValueBytes); boolean isNull = dataInputView.readBoolean(); @@ -398,18 +398,18 @@ private class RocksDBMapEntry implements Map.Entry { private final TypeSerializer valueSerializer; - private final ByteArrayDataInputView dataInputView; - private final ByteArrayDataOutputView dataOutputView; + private final DataInputDeserializer dataInputView; + private final DataOutputSerializer dataOutputView; RocksDBMapEntry( - @Nonnull final RocksDB db, - @Nonnegative final int userKeyOffset, - @Nonnull final byte[] rawKeyBytes, - @Nonnull final byte[] rawValueBytes, - @Nonnull final TypeSerializer keySerializer, - @Nonnull final TypeSerializer valueSerializer, - @Nonnull ByteArrayDataInputView dataInputView, - @Nonnull ByteArrayDataOutputView dataOutputView) { + @Nonnull final RocksDB db, + @Nonnegative final int userKeyOffset, + @Nonnull final byte[] rawKeyBytes, + @Nonnull final byte[] rawValueBytes, + @Nonnull final TypeSerializer keySerializer, + @Nonnull final TypeSerializer valueSerializer, + @Nonnull DataInputDeserializer dataInputView, + @Nonnull DataOutputSerializer dataOutputView) { this.db = db; this.userKeyOffset = userKeyOffset; @@ -512,14 +512,14 @@ private abstract class RocksDBMapIterator implements Iterator { private final TypeSerializer keySerializer; private final TypeSerializer valueSerializer; - private final ByteArrayDataInputView dataInputView; + private final DataInputDeserializer dataInputView; RocksDBMapIterator( final RocksDB db, final byte[] keyPrefixBytes, final TypeSerializer keySerializer, final TypeSerializer valueSerializer, - ByteArrayDataInputView dataInputView) { + DataInputDeserializer dataInputView) { this.db = db; this.keyPrefixBytes = keyPrefixBytes; @@ -580,7 +580,7 @@ private void loadCache() { * The iteration starts from the prefix bytes at the first loading. The cache then is * reloaded when the next entry to return is the last one in the cache. At that time, * we will start the iterating from the last returned entry. - */ + */ RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1); byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes); @@ -648,24 +648,24 @@ static class StateSnapshotTransformerWrapper implements StateSnapshotTransformer private static final byte[] NULL_VALUE; private static final byte NON_NULL_VALUE_PREFIX; static { - ByteArrayDataOutputView dov = new ByteArrayDataOutputView(1); + DataOutputSerializer dov = new DataOutputSerializer(1); try { dov.writeBoolean(true); - NULL_VALUE = dov.toByteArray(); - dov.reset(); + NULL_VALUE = dov.getCopyOfBuffer(); + dov.clear(); dov.writeBoolean(false); - NON_NULL_VALUE_PREFIX = dov.toByteArray()[0]; + NON_NULL_VALUE_PREFIX = dov.getSharedBuffer()[0]; } catch (IOException e) { throw new FlinkRuntimeException("Failed to serialize boolean flag of map user null value", e); } } private final StateSnapshotTransformer elementTransformer; - private final ByteArrayDataInputView div; + private final DataInputDeserializer div; StateSnapshotTransformerWrapper(StateSnapshotTransformer originalTransformer) { this.elementTransformer = originalTransformer; - this.div = new ByteArrayDataInputView(); + this.div = new DataInputDeserializer(); } @Override @@ -692,7 +692,7 @@ public byte[] filterOrTransform(@Nullable byte[] value) { private boolean isNull(byte[] value) { try { - div.setData(value, 0, 1); + div.setBuffer(value, 0, 1); return div.readBoolean(); } catch (IOException e) { throw new FlinkRuntimeException("Failed to deserialize boolean flag of map user null value", e); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java index d1fe3bd379854..138357b0d77c8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java @@ -115,12 +115,12 @@ public void mergeNamespaces(N target, Collection sources) { writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView); - final byte[] sourceKey = dataOutputView.toByteArray(); + final byte[] sourceKey = dataOutputView.getCopyOfBuffer(); final byte[] valueBytes = backend.db.get(columnFamily, sourceKey); backend.db.delete(columnFamily, writeOptions, sourceKey); if (valueBytes != null) { - dataInputView.setData(valueBytes); + dataInputView.setBuffer(valueBytes); V value = valueSerializer.deserialize(dataInputView); if (current != null) { @@ -138,11 +138,11 @@ public void mergeNamespaces(N target, Collection sources) { // create the target full-binary-key writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView); - final byte[] targetKey = dataOutputView.toByteArray(); + final byte[] targetKey = dataOutputView.getCopyOfBuffer(); final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey); if (targetValueBytes != null) { - dataInputView.setData(targetValueBytes); + dataInputView.setBuffer(targetValueBytes); // target also had a value, merge V value = valueSerializer.deserialize(dataInputView); @@ -150,11 +150,11 @@ public void mergeNamespaces(N target, Collection sources) { } // serialize the resulting value - dataOutputView.reset(); + dataOutputView.clear(); valueSerializer.serialize(current, dataOutputView); // write the resulting value - backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.toByteArray()); + backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.getCopyOfBuffer()); } } catch (Exception e) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index e9399e12a3279..0ca90d4a5212f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -81,12 +81,12 @@ public TypeSerializer getValueSerializer() { public V value() { try { writeCurrentKeyWithGroupAndNamespace(); - byte[] key = dataOutputView.toByteArray(); + byte[] key = dataOutputView.getCopyOfBuffer(); byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { return getDefaultValue(); } - dataInputView.setData(valueBytes); + dataInputView.setBuffer(valueBytes); return valueSerializer.deserialize(dataInputView); } catch (IOException | RocksDBException e) { throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e); @@ -102,10 +102,10 @@ public void update(V value) { try { writeCurrentKeyWithGroupAndNamespace(); - byte[] key = dataOutputView.toByteArray(); - dataOutputView.reset(); + byte[] key = dataOutputView.getCopyOfBuffer(); + dataOutputView.clear(); valueSerializer.serialize(value, dataOutputView); - backend.db.put(columnFamily, writeOptions, key, dataOutputView.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, dataOutputView.getCopyOfBuffer()); } catch (Exception e) { throw new FlinkRuntimeException("Error while adding data to RocksDB", e); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java index 0fa93dc8a1fb7..4f79d870d870a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils; import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; -import org.apache.flink.core.memory.ByteArrayDataInputView; +import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.Nonnull; @@ -53,7 +53,7 @@ public class RocksStateKeysIterator implements Iterator, AutoCloseable { private final boolean ambiguousKeyPossible; private final int keyGroupPrefixBytes; - private final ByteArrayDataInputView byteArrayDataInputView; + private final DataInputDeserializer byteArrayDataInputView; private K nextKey; private K previousKey; @@ -72,7 +72,7 @@ public RocksStateKeysIterator( this.nextKey = null; this.previousKey = null; this.ambiguousKeyPossible = ambiguousKeyPossible; - this.byteArrayDataInputView = new ByteArrayDataInputView(); + this.byteArrayDataInputView = new DataInputDeserializer(); } @Override @@ -107,8 +107,8 @@ public K next() { return tmpKey; } - private K deserializeKey(byte[] keyBytes, ByteArrayDataInputView readView) throws IOException { - readView.setData(keyBytes, keyGroupPrefixBytes, keyBytes.length - keyGroupPrefixBytes); + private K deserializeKey(byte[] keyBytes, DataInputDeserializer readView) throws IOException { + readView.setBuffer(keyBytes, keyGroupPrefixBytes, keyBytes.length - keyGroupPrefixBytes); return RocksDBKeySerializationUtils.readKey( keySerializer, byteArrayDataInputView, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java index ad8b74c975fb6..d402c3de5740d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java @@ -18,8 +18,8 @@ package org.apache.flink.contrib.streaming.state; -import org.apache.flink.core.memory.ByteArrayDataInputView; -import org.apache.flink.core.memory.ByteArrayDataOutputView; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.InternalPriorityQueueTestBase; import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue; @@ -52,8 +52,8 @@ protected boolean testSetSemanticsAgainstDuplicateElements() { TestElement, RocksDBCachingPriorityQueueSet> newFactory() { return (keyGroupId, numKeyGroups, keyExtractorFunction, elementComparator) -> { - ByteArrayDataOutputView outputStreamWithPos = new ByteArrayDataOutputView(); - ByteArrayDataInputView inputStreamWithPos = new ByteArrayDataInputView(); + DataOutputSerializer outputStreamWithPos = new DataOutputSerializer(128); + DataInputDeserializer inputStreamWithPos = new DataInputDeserializer(); int keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(numKeyGroups); TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(32); return new RocksDBCachingPriorityQueueSet<>( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java index 483b8fdd1dc76..942d85cf5ea10 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java @@ -18,7 +18,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.core.memory.ByteArrayDataOutputView; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.util.TestLogger; @@ -112,30 +112,30 @@ private void testClipDBWithKeyGroupRangeHelper( int currentGroupRangeStart = currentGroupRange.getStartKeyGroup(); int currentGroupRangeEnd = currentGroupRange.getEndKeyGroup(); - ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(32); + DataOutputSerializer outputView = new DataOutputSerializer(32); for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) { for (int j = 0; j < 100; ++j) { - outputView.reset(); + outputView.clear(); RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView); RocksDBKeySerializationUtils.writeKey( j, IntSerializer.INSTANCE, outputView, false); - rocksDB.put(columnFamilyHandle, outputView.toByteArray(), String.valueOf(j).getBytes()); + rocksDB.put(columnFamilyHandle, outputView.getCopyOfBuffer(), String.valueOf(j).getBytes()); } } for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) { for (int j = 0; j < 100; ++j) { - outputView.reset(); + outputView.clear(); RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView); RocksDBKeySerializationUtils.writeKey( j, IntSerializer.INSTANCE, outputView, false); - byte[] value = rocksDB.get(columnFamilyHandle, outputView.toByteArray()); + byte[] value = rocksDB.get(columnFamilyHandle, outputView.getCopyOfBuffer()); Assert.assertEquals(String.valueOf(j), new String(value)); } } @@ -149,14 +149,14 @@ private void testClipDBWithKeyGroupRangeHelper( for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) { for (int j = 0; j < 100; ++j) { - outputView.reset(); + outputView.clear(); RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView); RocksDBKeySerializationUtils.writeKey( j, IntSerializer.INSTANCE, outputView, false); - byte[] value = rocksDB.get(columnFamilyHandle, outputView.toByteArray()); + byte[] value = rocksDB.get(columnFamilyHandle, outputView.getCopyOfBuffer()); if (targetGroupRange.contains(i)) { Assert.assertEquals(String.valueOf(j), new String(value)); } else { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java index d92bef5e9606f..66c13a9a05fef 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java @@ -19,11 +19,11 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.core.memory.ByteArrayDataInputView; -import org.apache.flink.core.memory.ByteArrayDataOutputView; import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; @@ -63,19 +63,19 @@ public void testKeyGroupSerializationAndDeserialization() throws Exception { @Test public void testKeySerializationAndDeserialization() throws Exception { - final ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(8); - final ByteArrayDataInputView inputView = new ByteArrayDataInputView(); + final DataOutputSerializer outputView = new DataOutputSerializer(8); + final DataInputDeserializer inputView = new DataInputDeserializer(); // test for key for (int orgKey = 0; orgKey < 100; ++orgKey) { - outputView.reset(); + outputView.clear(); RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, false); - inputView.setData(outputView.toByteArray()); + inputView.setBuffer(outputView.getCopyOfBuffer()); int deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, false); Assert.assertEquals(orgKey, deserializedKey); RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, true); - inputView.setData(outputView.toByteArray()); + inputView.setBuffer(outputView.getCopyOfBuffer()); deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true); Assert.assertEquals(orgKey, deserializedKey); } @@ -83,18 +83,18 @@ public void testKeySerializationAndDeserialization() throws Exception { @Test public void testNamespaceSerializationAndDeserialization() throws Exception { - final ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(8); - final ByteArrayDataInputView inputView = new ByteArrayDataInputView(); + final DataOutputSerializer outputView = new DataOutputSerializer(8); + final DataInputDeserializer inputView = new DataInputDeserializer(); for (int orgNamespace = 0; orgNamespace < 100; ++orgNamespace) { - outputView.reset(); + outputView.clear(); RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputView, false); - inputView.setData(outputView.toByteArray()); + inputView.setBuffer(outputView.getCopyOfBuffer()); int deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, false); Assert.assertEquals(orgNamespace, deserializedNamepsace); RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputView, true); - inputView.setData(outputView.toByteArray()); + inputView.setBuffer(outputView.getCopyOfBuffer()); deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, true); Assert.assertEquals(orgNamespace, deserializedNamepsace); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java index e042ebd0609af..398df3f00c490 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator; -import org.apache.flink.core.memory.ByteArrayDataOutputView; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -105,7 +105,7 @@ void testIteratorHelper( testState.update(String.valueOf(i)); } - ByteArrayDataOutputView outputStream = new ByteArrayDataOutputView(8); + DataOutputSerializer outputStream = new DataOutputSerializer(8); boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); RocksDBKeySerializationUtils.writeNameSpace( namespace, @@ -113,7 +113,7 @@ void testIteratorHelper( outputStream, ambiguousKeyPossible); - byte[] nameSpaceBytes = outputStream.toByteArray(); + byte[] nameSpaceBytes = outputStream.getCopyOfBuffer(); try ( ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java index 1a8bc58f7bcf1..0b082ff180e80 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java @@ -162,7 +162,7 @@ public static ArrayDeque>> toDeque( deser = new DataInputDeserializer(serializedData, 0, serializedData.length); } else { - deser.setBuffer(serializedData, 0, serializedData.length); + deser.setBuffer(serializedData); } final Set ids = new HashSet<>(checkpoint.getNumIds());