Skip to content

Commit

Permalink
[FLINK-11827][table-runtime-blink] Introduce DataFormatConverters to …
Browse files Browse the repository at this point in the history
…convert between internal data format and java format

This closes apache#7904
  • Loading branch information
JingsongLi authored and KurtYoung committed Mar 6, 2019
1 parent 56afa2e commit e2579e3
Show file tree
Hide file tree
Showing 4 changed files with 1,298 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.table.type.InternalType;
import org.apache.flink.table.type.InternalTypes;
import org.apache.flink.table.util.SegmentsUtil;

import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
Expand All @@ -40,6 +42,7 @@ public class BinaryArray extends BinaryFormat implements TypeGetterSetters {
private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class);
private static final int SHORT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(short[].class);
private static final int CHAR_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(char[].class);
private static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class);
private static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class);
private static final int FLOAT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(float[].class);
Expand All @@ -49,6 +52,34 @@ public static int calculateHeaderInBytes(int numFields) {
return 4 + ((numFields + 31) / 32) * 4;
}

/**
* It store real value when type is primitive.
* It store the length and offset of variable-length part when type is string, map, etc.
*/
public static int calculateFixLengthPartSize(InternalType type) {
if (type.equals(InternalTypes.BOOLEAN)) {
return 1;
} else if (type.equals(InternalTypes.BYTE)) {
return 1;
} else if (type.equals(InternalTypes.SHORT)) {
return 2;
} else if (type.equals(InternalTypes.INT)) {
return 4;
} else if (type.equals(InternalTypes.FLOAT)) {
return 4;
} else if (type.equals(InternalTypes.CHAR)) {
return 2;
} else if (type.equals(InternalTypes.DATE)) {
return 4;
} else if (type.equals(InternalTypes.TIME)) {
return 4;
} else {
// long, double is 8 bytes.
// It store the length and offset of variable-length part when type is string, map, etc.
return 8;
}
}

// The number of elements in this array
private int numElements;

Expand Down Expand Up @@ -315,6 +346,14 @@ public short[] toShortArray() {
return values;
}

public char[] toCharArray() {
checkNoNull();
char[] values = new char[numElements];
SegmentsUtil.copyToUnsafe(
segments, elementOffset, values, CHAR_ARRAY_OFFSET, numElements * 2);
return values;
}

public int[] toIntArray() {
checkNoNull();
int[] values = new int[numElements];
Expand Down Expand Up @@ -393,6 +432,10 @@ public static BinaryArray fromPrimitiveArray(short[] arr) {
return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 2);
}

public static BinaryArray fromPrimitiveArray(char[] arr) {
return fromPrimitiveArray(arr, CHAR_ARRAY_OFFSET, arr.length, 2);
}

public static BinaryArray fromPrimitiveArray(int[] arr) {
return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4);
}
Expand Down
Loading

0 comments on commit e2579e3

Please sign in to comment.