Skip to content

Commit

Permalink
[FLINK-18168][table-runtime-blink] Fix array reuse for BinaryArrayDat…
Browse files Browse the repository at this point in the history
…a in converters

This closes apache#12542.
  • Loading branch information
zoudan authored and twalthr committed Jun 25, 2020
1 parent 49b5103 commit 581fabe
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private ArrayData toBinaryArrayData(E[] external) {
for (int pos = 0; pos < length; pos++) {
writeElement(pos, external[pos]);
}
return completeWriter();
return completeWriter().copy();
}

private E[] toJavaArray(ArrayData internal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ private ArrayData toBinaryArray(T[] value) {
}
}
reuseWriter.complete();
return reuseArray;
return reuseArray.copy();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,18 @@ private static DataFormatConverter getConverter(TypeInformation typeInfo) {
}

private static void test(TypeInformation typeInfo, Object value) {
test(typeInfo, value, null);
}

private static void test(TypeInformation typeInfo, Object value, Object anotherValue) {
DataFormatConverter converter = getConverter(typeInfo);
final Object innerValue = converter.toInternal(value);
if (anotherValue != null) {
converter.toInternal(anotherValue);
}

Assert.assertTrue(Arrays.deepEquals(
new Object[] {converter.toExternal(converter.toInternal(value))}, new Object[] {value}));
new Object[] {converter.toExternal(innerValue)}, new Object[]{value}));
}

private static DataFormatConverter getConverter(DataType dataType) {
Expand Down Expand Up @@ -193,6 +202,7 @@ public void testTypes() {
test(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, new Double[] {null, null});
test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {null, null});
test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {"haha", "hehe"});
test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {"haha", "hehe"}, new String[] {"aa", "bb"});
test(new MapTypeInfo<>(Types.STRING, Types.INT), null);

HashMap<String, Integer> map = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ public static List<TestSpec> testData() {
null
)
})
.convertedToWithAnotherValue(
Row[].class,
new Row[] {
Row.of(null, null),
Row.of(new PojoWithImmutableFields(10, "Bob"), null)
})
);
}

Expand All @@ -369,6 +375,11 @@ public void testConversions() {

final Object internalValue = fromConverter.toInternalOrNull(from.getValue());

final Object anotherValue = testSpec.conversionsWithAnotherValue.get(from.getKey());
if (anotherValue != null) {
fromConverter.toInternalOrNull(anotherValue);
}

for (Map.Entry<Class<?>, Object> to : testSpec.conversions.entrySet()) {
final DataType toDataType = testSpec.dataType.bridgedTo(to.getKey());

Expand All @@ -395,12 +406,15 @@ private static class TestSpec {

private final Map<Class<?>, Object> conversions;

private final Map<Class<?>, Object> conversionsWithAnotherValue;

private @Nullable String expectedErrorMessage;

private TestSpec(String description, DataType dataType) {
this.description = description;
this.dataType = dataType;
this.conversions = new LinkedHashMap<>();
this.conversionsWithAnotherValue = new LinkedHashMap<>();
}

static TestSpec forDataType(AbstractDataType<?> dataType) {
Expand All @@ -420,6 +434,11 @@ <T> TestSpec convertedTo(Class<T> clazz, T value) {
return this;
}

<T> TestSpec convertedToWithAnotherValue(Class<T> clazz, T value) {
conversionsWithAnotherValue.put(clazz, value);
return this;
}

<T> TestSpec convertedToSupplier(Class<T> clazz, Supplier<T> supplier) {
conversions.put(clazz, supplier.get());
return this;
Expand Down

0 comments on commit 581fabe

Please sign in to comment.