Skip to content

Commit

Permalink
[FLINK-10176][state] Replace ByteArrayData[Input|Output]View with (en…
Browse files Browse the repository at this point in the history
…hanced) Data[Output|InputDe]Serializer

This closes apache#6583.
  • Loading branch information
StefanRRichter committed Aug 22, 2018
1 parent 2b6beed commit 3fd6587
Show file tree
Hide file tree
Showing 22 changed files with 211 additions and 318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic,
V value = null;

if (messageKey != null) {
inputDeserializer.setBuffer(messageKey, 0, messageKey.length);
inputDeserializer.setBuffer(messageKey);
key = keySerializer.deserialize(inputDeserializer);
}
if (message != null) {
inputDeserializer.setBuffer(message, 0, message.length);
inputDeserializer.setBuffer(message);
value = valueSerializer.deserialize(inputDeserializer);
}
return new Tuple2<>(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, TypeSeria
@Override
public T deserialize(byte[] message) {
if (dis != null) {
dis.setBuffer(message, 0, message.length);
dis.setBuffer(message);
} else {
dis = new DataInputDeserializer(message, 0, message.length);
dis = new DataInputDeserializer(message);
}

try {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.core.memory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.EOFException;
import java.io.IOException;
import java.io.UTFDataFormatException;
Expand All @@ -29,6 +32,7 @@
*/
public class DataInputDeserializer implements DataInputView, java.io.Serializable {

private static final byte[] EMPTY = new byte[0];
private static final long serialVersionUID = 1L;

// ------------------------------------------------------------------------
Expand All @@ -41,25 +45,27 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl

// ------------------------------------------------------------------------

public DataInputDeserializer() {}
public DataInputDeserializer() {
setBuffer(EMPTY);
}

public DataInputDeserializer(byte[] buffer) {
setBuffer(buffer, 0, buffer.length);
public DataInputDeserializer(@Nonnull byte[] buffer) {
setBufferInternal(buffer, 0, buffer.length);
}

public DataInputDeserializer(byte[] buffer, int start, int len) {
public DataInputDeserializer(@Nonnull byte[] buffer, int start, int len) {
setBuffer(buffer, start, len);
}

public DataInputDeserializer(ByteBuffer buffer) {
public DataInputDeserializer(@Nonnull ByteBuffer buffer) {
setBuffer(buffer);
}

// ------------------------------------------------------------------------
// Changing buffers
// ------------------------------------------------------------------------

public void setBuffer(ByteBuffer buffer) {
public void setBuffer(@Nonnull ByteBuffer buffer) {
if (buffer.hasArray()) {
this.buffer = buffer.array();
this.position = buffer.arrayOffset() + buffer.position();
Expand All @@ -76,15 +82,20 @@ public void setBuffer(ByteBuffer buffer) {
}
}

public void setBuffer(byte[] buffer, int start, int len) {
if (buffer == null) {
throw new NullPointerException();
}
public void setBuffer(@Nonnull byte[] buffer, int start, int len) {

if (start < 0 || len < 0 || start + len > buffer.length) {
throw new IllegalArgumentException();
throw new IllegalArgumentException("Invalid bounds.");
}

setBufferInternal(buffer, start, len);
}

public void setBuffer(@Nonnull byte[] buffer) {
setBufferInternal(buffer, 0, buffer.length);
}

private void setBufferInternal(@Nonnull byte[] buffer, int start, int len) {
this.buffer = buffer;
this.position = start;
this.end = start + len;
Expand Down Expand Up @@ -144,12 +155,12 @@ public float readFloat() throws IOException {
}

@Override
public void readFully(byte[] b) throws IOException {
public void readFully(@Nonnull byte[] b) throws IOException {
readFully(b, 0, b.length);
}

@Override
public void readFully(byte[] b, int off, int len) throws IOException {
public void readFully(@Nonnull byte[] b, int off, int len) throws IOException {
if (len >= 0) {
if (off <= b.length - len) {
if (this.position <= this.end - len) {
Expand All @@ -161,7 +172,7 @@ public void readFully(byte[] b, int off, int len) throws IOException {
} else {
throw new ArrayIndexOutOfBoundsException();
}
} else if (len < 0) {
} else {
throw new IllegalArgumentException("Length may not be negative.");
}
}
Expand All @@ -182,6 +193,7 @@ public int readInt() throws IOException {
}
}

@Nullable
@Override
public String readLine() throws IOException {
if (this.position < this.end) {
Expand Down Expand Up @@ -229,6 +241,7 @@ public short readShort() throws IOException {
}
}

@Nonnull
@Override
public String readUTF() throws IOException {
int utflen = readUnsignedShort();
Expand Down Expand Up @@ -319,7 +332,7 @@ public int readUnsignedShort() throws IOException {
}

@Override
public int skipBytes(int n) throws IOException {
public int skipBytes(int n) {
if (this.position <= this.end - n) {
this.position += n;
return n;
Expand All @@ -340,10 +353,7 @@ public void skipBytesToRead(int numBytes) throws IOException {
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (b == null){
throw new NullPointerException("Byte array b cannot be null.");
}
public int read(@Nonnull byte[] b, int off, int len) throws IOException {

if (off < 0){
throw new IndexOutOfBoundsException("Offset cannot be negative.");
Expand All @@ -370,10 +380,14 @@ public int read(byte[] b, int off, int len) throws IOException {
}

@Override
public int read(byte[] b) throws IOException {
public int read(@Nonnull byte[] b) throws IOException {
return read(b, 0, b.length);
}

public int getPosition() {
return position;
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.state.ttl;

import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer;
import org.apache.flink.util.FlinkRuntimeException;
Expand All @@ -34,12 +34,12 @@
abstract class TtlStateSnapshotTransformer<T> implements CollectionStateSnapshotTransformer<T> {
private final TtlTimeProvider ttlTimeProvider;
final long ttl;
private final ByteArrayDataInputView div;
private final DataInputDeserializer div;

TtlStateSnapshotTransformer(@Nonnull TtlTimeProvider ttlTimeProvider, long ttl) {
this.ttlTimeProvider = ttlTimeProvider;
this.ttl = ttl;
this.div = new ByteArrayDataInputView();
this.div = new DataInputDeserializer();
}

<V> TtlValue<V> filterTtlValue(TtlValue<V> value) {
Expand All @@ -55,7 +55,7 @@ boolean expired(long ts) {
}

long deserializeTs(byte[] value) throws IOException {
div.setData(value, 0, Long.BYTES);
div.setBuffer(value, 0, Long.BYTES);
return LongSerializer.INSTANCE.deserialize(div);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ SV getInternal(byte[] key) {
if (valueBytes == null) {
return null;
}
dataInputView.setData(valueBytes);
dataInputView.setBuffer(valueBytes);
return valueSerializer.deserialize(dataInputView);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
Expand Down
Loading

0 comments on commit 3fd6587

Please sign in to comment.