Skip to content

Commit

Permalink
[FLINK-31250][formats][parquet] Parquet format supports MULTISET type
Browse files Browse the repository at this point in the history
This closes apache#22043
  • Loading branch information
SteNicholas committed Feb 28, 2023
1 parent 150f276 commit 1610141
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -128,6 +130,10 @@ private FieldWriter createWriter(LogicalType t, Type type) {
&& logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
return new MapWriter(
((MapType) t).getKeyType(), ((MapType) t).getValueType(), groupType);
} else if (t instanceof MultisetType
&& logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
return new MapWriter(
((MultisetType) t).getElementType(), new IntType(false), groupType);
} else if (t instanceof RowType && type instanceof GroupType) {
return new RowWriter((RowType) t, groupType);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.parquet.schema.ConversionPatterns;
Expand Down Expand Up @@ -125,6 +127,14 @@ private static Type convertToParquetType(
MAP_REPEATED_NAME,
convertToParquetType("key", mapType.getKeyType()),
convertToParquetType("value", mapType.getValueType()));
case MULTISET:
MultisetType multisetType = (MultisetType) type;
return ConversionPatterns.mapType(
repetition,
name,
MAP_REPEATED_NAME,
convertToParquetType("key", multisetType.getElementType()),
convertToParquetType("value", new IntType(false)));
case ROW:
RowType rowType = (RowType) type;
return new GroupType(repetition, name, convertToParquetTypes(rowType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -363,21 +364,38 @@ public static ColumnReader createColumnReader(
fieldType);
case MAP:
MapType mapType = (MapType) fieldType;
ArrayColumnReader keyReader =
ArrayColumnReader mapKeyReader =
new ArrayColumnReader(
descriptors.get(0),
pages.getPageReader(descriptors.get(0)),
isUtcTimestamp,
descriptors.get(0).getPrimitiveType(),
new ArrayType(mapType.getKeyType()));
ArrayColumnReader valueReader =
ArrayColumnReader mapValueReader =
new ArrayColumnReader(
descriptors.get(1),
pages.getPageReader(descriptors.get(1)),
isUtcTimestamp,
descriptors.get(1).getPrimitiveType(),
new ArrayType(mapType.getValueType()));
return new MapColumnReader(keyReader, valueReader);
return new MapColumnReader(mapKeyReader, mapValueReader);
case MULTISET:
MultisetType multisetType = (MultisetType) fieldType;
ArrayColumnReader multisetKeyReader =
new ArrayColumnReader(
descriptors.get(0),
pages.getPageReader(descriptors.get(0)),
isUtcTimestamp,
descriptors.get(0).getPrimitiveType(),
new ArrayType(multisetType.getElementType()));
ArrayColumnReader multisetValueReader =
new ArrayColumnReader(
descriptors.get(1),
pages.getPageReader(descriptors.get(1)),
isUtcTimestamp,
descriptors.get(1).getPrimitiveType(),
new ArrayType(new IntType(false)));
return new MapColumnReader(multisetKeyReader, multisetValueReader);
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = type.asGroupType();
Expand Down Expand Up @@ -508,19 +526,36 @@ public static WritableColumnVector createWritableColumnVector(
depth));
case MAP:
MapType mapType = (MapType) fieldType;
GroupType repeatedType = type.asGroupType().getType(0).asGroupType();
GroupType mapRepeatedType = type.asGroupType().getType(0).asGroupType();
return new HeapMapVector(
batchSize,
createWritableColumnVector(
batchSize,
mapType.getKeyType(),
repeatedType.getType(0),
mapRepeatedType.getType(0),
descriptors,
depth + 2),
createWritableColumnVector(
batchSize,
mapType.getValueType(),
repeatedType.getType(1),
mapRepeatedType.getType(1),
descriptors,
depth + 2));
case MULTISET:
MultisetType multisetType = (MultisetType) fieldType;
GroupType multisetRepeatedType = type.asGroupType().getType(0).asGroupType();
return new HeapMapVector(
batchSize,
createWritableColumnVector(
batchSize,
multisetType.getElementType(),
multisetRepeatedType.getType(0),
descriptors,
depth + 2),
createWritableColumnVector(
batchSize,
new IntType(false),
multisetRepeatedType.getType(1),
descriptors,
depth + 2));
case ROW:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
Expand Down Expand Up @@ -76,7 +77,7 @@
/** Test for {@link ParquetColumnarRowSplitReader}. */
class ParquetColumnarRowSplitReaderTest {

private static final int FIELD_NUMBER = 33;
private static final int FIELD_NUMBER = 34;
private static final LocalDateTime BASE_TIME = LocalDateTime.now();

private static final RowType ROW_TYPE =
Expand Down Expand Up @@ -115,6 +116,7 @@ class ParquetColumnarRowSplitReaderTest {
new VarCharType(VarCharType.MAX_LENGTH),
new VarCharType(VarCharType.MAX_LENGTH)),
new MapType(new IntType(), new BooleanType()),
new MultisetType(new VarCharType(VarCharType.MAX_LENGTH)),
RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));

@TempDir File tmpDir;
Expand Down Expand Up @@ -218,7 +220,7 @@ private ParquetColumnarRowSplitReader createReader(
new String[] {
"f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12",
"f13", "f14", "f15", "f16", "f17", "f18", "f19", "f20", "f21", "f22", "f23",
"f24", "f25", "f26", "f27", "f28", "f29", "f30", "f31", "f32"
"f24", "f25", "f26", "f27", "f28", "f29", "f30", "f31", "f32", "f33"
},
VectorizedColumnBatch::new,
500,
Expand Down Expand Up @@ -276,6 +278,7 @@ private int readSplitAndCheck(
assertThat(row.isNullAt(30)).isTrue();
assertThat(row.isNullAt(31)).isTrue();
assertThat(row.isNullAt(32)).isTrue();
assertThat(row.isNullAt(33)).isTrue();
} else {
assertThat(row.getString(0)).hasToString("" + v);
assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0);
Expand Down Expand Up @@ -330,8 +333,9 @@ private int readSplitAndCheck(
.isEqualTo(DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
assertThat(row.getMap(30).valueArray().getString(0)).hasToString("" + v);
assertThat(row.getMap(31).valueArray().getBoolean(0)).isEqualTo(v % 2 == 0);
assertThat(row.getRow(32, 2).getString(0)).hasToString("" + v);
assertThat(row.getRow(32, 2).getInt(1)).isEqualTo(v.intValue());
assertThat(row.getMap(32).keyArray().getString(0)).hasToString("" + v);
assertThat(row.getRow(33, 2).getString(0)).hasToString("" + v);
assertThat(row.getRow(33, 2).getInt(1)).isEqualTo(v.intValue());
}
i++;
}
Expand All @@ -346,6 +350,9 @@ private RowData newRow(Integer v) {
Map<Integer, Boolean> f31 = new HashMap<>();
f31.put(v, v % 2 == 0);

Map<StringData, Integer> f32 = new HashMap<>();
f32.put(StringData.fromString("" + v), v);

return GenericRowData.of(
StringData.fromString("" + v),
v % 2 == 0,
Expand Down Expand Up @@ -402,6 +409,7 @@ private RowData newRow(Integer v) {
}),
new GenericMapData(f30),
new GenericMapData(f31),
new GenericMapData(f32),
GenericRowData.of(StringData.fromString("" + v), v));
}

Expand Down

0 comments on commit 1610141

Please sign in to comment.