Skip to content

Commit

Permalink
[FLINK-10028][core] Introduce reusable ByteArrayData[Input/Output]Vie…
Browse files Browse the repository at this point in the history
…w as adapter between Data[Input/Output]View and byte-arrays
  • Loading branch information
StefanRRichter committed Aug 2, 2018
1 parent a20fd1d commit fffd2e8
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.memory;

import javax.annotation.Nonnull;

/**
* Reusable adapter to {@link DataInputView} that operates on given byte-arrays.
*/
public class ByteArrayDataInputView extends DataInputViewStreamWrapper {

@Nonnull
private final ByteArrayInputStreamWithPos inStreamWithPos;

public ByteArrayDataInputView() {
super(new ByteArrayInputStreamWithPos());
this.inStreamWithPos = (ByteArrayInputStreamWithPos) in;
}

public ByteArrayDataInputView(@Nonnull byte[] buffer) {
this(buffer, 0, buffer.length);
}

public ByteArrayDataInputView(@Nonnull byte[] buffer, int offset, int length) {
this();
setData(buffer, offset, length);
}

public int getPosition() {
return inStreamWithPos.getPosition();
}

public void setPosition(int pos) {
inStreamWithPos.setPosition(pos);
}

public void setData(@Nonnull byte[] buffer, int offset, int length) {
inStreamWithPos.setBuffer(buffer, offset, length);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.memory;

import javax.annotation.Nonnull;

/**
* Adapter to {@link DataOutputView} that operates on a byte-array and offers read/write access to the current position.
*/
public class ByteArrayDataOutputView extends DataOutputViewStreamWrapper {

@Nonnull
private final ByteArrayOutputStreamWithPos outputStreamWithPos;

public ByteArrayDataOutputView() {
this(64);
}

public ByteArrayDataOutputView(int initialSize) {
super(new ByteArrayOutputStreamWithPos(initialSize));
this.outputStreamWithPos = (ByteArrayOutputStreamWithPos) out;
}

public void reset() {
outputStreamWithPos.reset();
}

@Nonnull
public byte[] toByteArray() {
return outputStreamWithPos.toByteArray();
}

public int getPosition() {
return outputStreamWithPos.getPosition();
}

public void setPosition(int position) {
outputStreamWithPos.setPosition(position);
}

@Nonnull
public byte[] getInternalBufferReference() {
return outputStreamWithPos.getBuf();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.flink.contrib.streaming.state;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.util.CloseableIterator;
Expand Down Expand Up @@ -84,19 +82,13 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
@Nonnull
private final byte[] groupPrefixBytes;

/** Output stream that helps to serialize elements. */
/** Output view that helps to serialize elements. */
@Nonnull
private final ByteArrayOutputStreamWithPos outputStream;
private final ByteArrayDataOutputView outputView;

/** Output view that helps to serialize elements, must wrap the output stream. */
/** Input view that helps to de-serialize elements. */
@Nonnull
private final DataOutputViewStreamWrapper outputView;

@Nonnull
private final ByteArrayInputStreamWithPos inputStream;

@Nonnull
private final DataInputViewStreamWrapper inputView;
private final ByteArrayDataInputView inputView;

/** In memory cache that holds a head-subset of the elements stored in RocksDB. */
@Nonnull
Expand All @@ -122,20 +114,18 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
@Nonnull RocksDB db,
@Nonnull ColumnFamilyHandle columnFamilyHandle,
@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
@Nonnull ByteArrayOutputStreamWithPos outputStream,
@Nonnull ByteArrayInputStreamWithPos inputStream,
@Nonnull ByteArrayDataOutputView outputStream,
@Nonnull ByteArrayDataInputView inputStream,
@Nonnull RocksDBWriteBatchWrapper batchWrapper,
@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
this.db = db;
this.columnFamilyHandle = columnFamilyHandle;
this.byteOrderProducingSerializer = byteOrderProducingSerializer;
this.outputStream = outputStream;
this.inputStream = inputStream;
this.batchWrapper = batchWrapper;
this.allElementsInCache = false;
this.outputView = new DataOutputViewStreamWrapper(outputStream);
this.inputView = new DataInputViewStreamWrapper(inputStream);
this.outputView = outputStream;
this.inputView = inputStream;
this.orderedCache = orderedByteArraySetCache;
this.allElementsInCache = false;
this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, keyGroupPrefixBytes);
this.seekHint = groupPrefixBytes;
this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED;
Expand Down Expand Up @@ -367,24 +357,24 @@ private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
@Nonnull
private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) {

outputStream.reset();
outputView.reset();

try {
RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView);
} catch (IOException e) {
throw new FlinkRuntimeException("Could not write key-group bytes.", e);
}

return outputStream.toByteArray();
return outputView.toByteArray();
}

@Nonnull
private byte[] serializeElement(@Nonnull E element) {
try {
outputStream.reset();
outputView.reset();
outputView.write(groupPrefixBytes);
byteOrderProducingSerializer.serialize(element, outputView);
return outputStream.toByteArray();
return outputView.toByteArray();
} catch (IOException e) {
throw new FlinkRuntimeException("Error while serializing the element.", e);
}
Expand All @@ -393,7 +383,7 @@ private byte[] serializeElement(@Nonnull E element) {
@Nonnull
private E deserializeElement(@Nonnull byte[] bytes) {
try {
inputStream.setBuffer(bytes, groupPrefixBytes.length, bytes.length);
inputView.setData(bytes, groupPrefixBytes.length, bytes.length);
return byteOrderProducingSerializer.deserialize(inputView);
} catch (IOException e) {
throw new FlinkRuntimeException("Error while deserializing the element.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
Expand Down Expand Up @@ -2650,15 +2652,15 @@ class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {

/** A shared buffer to serialize elements for the priority queue. */
@Nonnull
private final ByteArrayOutputStreamWithPos elementSerializationOutStream;
private final ByteArrayDataOutputView sharedElementOutView;

/** A shared buffer to de-serialize elements for the priority queue. */
@Nonnull
private final ByteArrayInputStreamWithPos elementSerializationInStream;
private final ByteArrayDataInputView sharedElementInView;

RocksDBPriorityQueueSetFactory() {
this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
this.elementSerializationInStream = new ByteArrayInputStreamWithPos();
this.sharedElementOutView = new ByteArrayDataOutputView();
this.sharedElementInView = new ByteArrayDataInputView();
}

@Nonnull
Expand Down Expand Up @@ -2689,8 +2691,8 @@ public RocksDBCachingPriorityQueueSet<T> create(
db,
columnFamilyHandle,
byteOrderedElementSerializer,
elementSerializationOutStream,
elementSerializationInStream,
sharedElementOutView,
sharedElementInView,
writeBatchWrapper,
orderedSetCache
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
Expand Down Expand Up @@ -52,8 +52,8 @@ protected boolean testSetSemanticsAgainstDuplicateElements() {
TestElement, RocksDBCachingPriorityQueueSet<TestElement>> newFactory() {

return (keyGroupId, numKeyGroups, keyExtractorFunction, elementComparator) -> {
ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
ByteArrayInputStreamWithPos inputStreamWithPos = new ByteArrayInputStreamWithPos();
ByteArrayDataOutputView outputStreamWithPos = new ByteArrayDataOutputView();
ByteArrayDataInputView inputStreamWithPos = new ByteArrayDataInputView();
int keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(numKeyGroups);
TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(32);
return new RocksDBCachingPriorityQueueSet<>(
Expand Down

0 comments on commit fffd2e8

Please sign in to comment.