Skip to content

Commit

Permalink
[FLINK-13304][table-runtime-blink] Fix implementation of getString an…
Browse files Browse the repository at this point in the history
…d getBinary method in NestedRow

This closes apache#9139
  • Loading branch information
tsreaper authored and wuchong committed Jul 22, 2019
1 parent 6961d9b commit e53a85d
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Its memory storage structure and {@link BinaryRow} exactly the same, the only different is it supports
* all bytes in variable MemorySegments.
* Its memory storage structure is exactly the same with {@link BinaryRow}.
* The only different is that, as {@link NestedRow} is used
* to store row value in the variable-length part of {@link BinaryRow},
* every field (including both fixed-length part and variable-length part) of {@link NestedRow}
* has a possibility to cross the boundary of a segment, while the fixed-length part of {@link BinaryRow}
* must fit into its first memory segment.
*/
public final class NestedRow extends BinaryFormat implements BaseRow {

Expand Down Expand Up @@ -219,7 +223,7 @@ public double getDouble(int pos) {
public BinaryString getString(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = segments[0].getLong(fieldOffset);
final long offsetAndLen = SegmentsUtil.getLong(segments, fieldOffset);
return BinaryString.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
}

Expand Down Expand Up @@ -247,7 +251,7 @@ public <T> BinaryGeneric<T> getGeneric(int pos) {
public byte[] getBinary(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = segments[0].getLong(fieldOffset);
final long offsetAndLen = SegmentsUtil.getLong(segments, fieldOffset);
return readBinaryFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.dataformat;

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;

/**
* Utils for testing data formats.
*/
class DataFormatTestUtil {

/**
* Split the given byte array into two memory segments.
*/
static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) {
int newSize = (bytes.length + 1) / 2 + baseOffset;
MemorySegment[] ret = new MemorySegment[2];
ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);

ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
ret[1].put(0, bytes, newSize - baseOffset, bytes.length - (newSize - baseOffset));
return ret;
}

/**
* A simple class for testing generic type getting / setting on data formats.
*/
static class MyObj {
public int i;
public double j;

MyObj(int i, double j) {
this.i = i;
this.j = j;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

MyObj myObj = (MyObj) o;

return i == myObj.i && Double.compare(myObj.j, j) == 0;
}

@Override
public String toString() {
return "MyObj{" + "i=" + i + ", j=" + j + '}';
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.dataformat;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.typeutils.BaseRowSerializer;

import org.junit.Test;

import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj;
import static org.apache.flink.table.dataformat.DataFormatTestUtil.splitBytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Test for {@link NestedRow}s.
*/
public class NestedRowTest {

@Test
public void testNestedRowWithOneSegment() {
BinaryRow row = getBinaryRow();
GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());

BaseRow nestedRow = row.getRow(0, 5);
assertEquals(nestedRow.getInt(0), 1);
assertEquals(nestedRow.getLong(1), 5L);
assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678"));
assertTrue(nestedRow.isNullAt(3));
assertEquals(new MyObj(15, 5),
BinaryGeneric.getJavaObjectFromBinaryGeneric(nestedRow.getGeneric(4), genericSerializer));
}

@Test
public void testNestedRowWithMultipleSegments() {
BinaryRow row = getBinaryRow();
GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());

MemorySegment[] segments = splitBytes(row.getSegments()[0].getHeapMemory(), 3);
row.pointTo(segments, 3, row.getSizeInBytes());
{
BaseRow nestedRow = row.getRow(0, 5);
assertEquals(nestedRow.getInt(0), 1);
assertEquals(nestedRow.getLong(1), 5L);
assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678"));
assertTrue(nestedRow.isNullAt(3));
assertEquals(new MyObj(15, 5),
BinaryGeneric.getJavaObjectFromBinaryGeneric(nestedRow.getGeneric(4), genericSerializer));
}
}

@Test
public void testNestInNestedRow() {
// layer1
GenericRow gRow = new GenericRow(4);
gRow.setField(0, 1);
gRow.setField(1, 5L);
gRow.setField(2, BinaryString.fromString("12345678"));
gRow.setField(3, null);

// layer2
BaseRowSerializer serializer = new BaseRowSerializer(
new LogicalType[]{
DataTypes.INT().getLogicalType(),
DataTypes.BIGINT().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.STRING().getLogicalType()
},
new TypeSerializer[]{
IntSerializer.INSTANCE,
LongSerializer.INSTANCE,
StringSerializer.INSTANCE,
StringSerializer.INSTANCE
});
BinaryRow row = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.writeString(0, BinaryString.fromString("hahahahafff"));
writer.writeRow(1, gRow, serializer);
writer.complete();

// layer3
BinaryRow row2 = new BinaryRow(1);
BinaryRowWriter writer2 = new BinaryRowWriter(row2);
writer2.writeRow(0, row, null);
writer2.complete();

// verify
{
NestedRow nestedRow = (NestedRow) row2.getRow(0, 2);
BinaryRow binaryRow = new BinaryRow(2);
binaryRow.pointTo(nestedRow.getSegments(), nestedRow.getOffset(),
nestedRow.getSizeInBytes());
assertEquals(binaryRow, row);
}

assertEquals(row2.getRow(0, 2).getString(0), BinaryString.fromString("hahahahafff"));
BaseRow nestedRow = row2.getRow(0, 2).getRow(1, 4);
assertEquals(nestedRow.getInt(0), 1);
assertEquals(nestedRow.getLong(1), 5L);
assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678"));
assertTrue(nestedRow.isNullAt(3));
}

private BinaryRow getBinaryRow() {
BinaryRow row = new BinaryRow(1);
BinaryRowWriter writer = new BinaryRowWriter(row);

GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
GenericRow gRow = new GenericRow(5);
gRow.setField(0, 1);
gRow.setField(1, 5L);
gRow.setField(2, BinaryString.fromString("12345678"));
gRow.setField(3, null);
gRow.setField(4, new BinaryGeneric<>(new MyObj(15, 5), genericSerializer));

BaseRowSerializer serializer = new BaseRowSerializer(
new LogicalType[]{
DataTypes.INT().getLogicalType(),
DataTypes.BIGINT().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.ANY(info).getLogicalType()
},
new TypeSerializer[]{
IntSerializer.INSTANCE,
LongSerializer.INSTANCE,
StringSerializer.INSTANCE,
StringSerializer.INSTANCE,
genericSerializer
});
writer.writeRow(0, gRow, serializer);
writer.complete();

return row;
}
}

0 comments on commit e53a85d

Please sign in to comment.