Skip to content

Commit

Permalink
[FLINK-20054][formats] Fix ParquetInputFormat 3 level List handling
Browse files Browse the repository at this point in the history
This closes apache#13994
  • Loading branch information
HuangZhenQiu authored and JingsongLi committed Dec 2, 2020
1 parent 7818b7a commit c77e6f9
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,7 @@ public static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldTy
// If the repeated field is a group with multiple fields, then its type is the element
// type and elements are required.
if (elementType.getFieldCount() > 1) {

for (Type type : elementType.getFields()) {
if (!type.isRepetition(Type.Repetition.REQUIRED)) {
throw new UnsupportedOperationException(
String.format("List field [%s] in List [%s] has to be required. ",
type.toString(), fieldType.getName()));
}
}
typeInfo = ObjectArrayTypeInfo.getInfoFor(
convertParquetTypeToTypeInfo(elementType));
typeInfo = convertGroupElementToArrayTypeInfo(parquetGroupType, elementType);
} else {
Type internalType = elementType.getType(0);
if (internalType.isPrimitive()) {
Expand All @@ -250,9 +241,7 @@ public static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldTy
typeInfo = ObjectArrayTypeInfo.getInfoFor(
convertParquetTypeToTypeInfo(internalType));
} else {
throw new UnsupportedOperationException(
String.format("Unrecgonized List schema [%s] according to Parquet"
+ " standard", parquetGroupType.toString()));
typeInfo = convertGroupElementToArrayTypeInfo(parquetGroupType, tupleGroup);
}
}
}
Expand Down Expand Up @@ -306,6 +295,18 @@ public static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldTy
return typeInfo;
}

private static ObjectArrayTypeInfo convertGroupElementToArrayTypeInfo(GroupType arrayFieldType, GroupType elementType) {
for (Type type : elementType.getFields()) {
if (!type.isRepetition(Type.Repetition.REQUIRED)) {
throw new UnsupportedOperationException(
String.format("List field [%s] in List [%s] has to be required. ",
type.toString(), arrayFieldType.getName()));
}
}
return ObjectArrayTypeInfo.getInfoFor(
convertParquetTypeToTypeInfo(elementType));
}

private static TypeInformation<?> convertParquetPrimitiveListToFlinkArray(Type type) {
// Backward-compatibility element group doesn't exist also allowed
TypeInformation<?> flinkType = convertParquetTypeToTypeInfo(type);
Expand Down Expand Up @@ -374,17 +375,33 @@ private static Type convertField(String fieldName, TypeInformation<?> typeInfo,
GroupType componentGroup = (GroupType) convertField(LIST_ELEMENT, objectArrayTypeInfo.getComponentInfo(),
Type.Repetition.REQUIRED, legacyMode);

GroupType elementGroup = Types.repeatedGroup().named(LIST_ELEMENT);
elementGroup = elementGroup.withNewFields(componentGroup.getFields());
fieldType = Types.buildGroup(repetition)
.addField(elementGroup)
.as(OriginalType.LIST)
.named(fieldName);
if (legacyMode) {
// LegacyMode is 2 Level List schema
fieldType = Types.buildGroup(repetition)
.addField(componentGroup)
.as(OriginalType.LIST)
.named(fieldName);
} else {
// Add extra layer of Group according to Parquet's standard
Type listGroup = Types.repeatedGroup()
.addField(componentGroup).named(LIST_GROUP_NAME);
fieldType = Types.buildGroup(repetition)
.addField(listGroup)
.as(OriginalType.LIST)
.named(fieldName);
}
} else if (typeInfo instanceof BasicArrayTypeInfo) {
BasicArrayTypeInfo basicArrayType = (BasicArrayTypeInfo) typeInfo;

// LegacyMode is 2 Level List schema
if (legacyMode) {

PrimitiveType primitiveTyp =
convertField(fieldName, basicArrayType.getComponentInfo(),
Type.Repetition.REQUIRED, legacyMode).asPrimitiveType();
fieldType = Types.buildGroup(repetition)
.addField(primitiveTyp)
.as(OriginalType.LIST).named(fieldName);
} else {
// Add extra layer of Group according to Parquet's standard
Type listGroup = Types.repeatedGroup().addField(
convertField(LIST_ELEMENT, basicArrayType.getComponentInfo(),
Expand All @@ -393,15 +410,6 @@ private static Type convertField(String fieldName, TypeInformation<?> typeInfo,
fieldType = Types.buildGroup(repetition)
.addField(listGroup)
.as(OriginalType.LIST).named(fieldName);
} else {
PrimitiveType primitiveTyp =
convertField(fieldName, basicArrayType.getComponentInfo(),
Type.Repetition.REQUIRED, legacyMode).asPrimitiveType();
fieldType = Types.buildGroup(repetition)
.repeated(primitiveTyp.getPrimitiveTypeName())
.as(primitiveTyp.getOriginalType())
.named(LIST_ARRAY_TYPE)
.as(OriginalType.LIST).named(fieldName);
}
} else if (typeInfo instanceof SqlTimeTypeInfo) {
if (typeInfo.equals(SqlTimeTypeInfo.DATE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.parquet.schema.Type.Repetition.REPEATED;

/**
* Extends from {@link GroupConverter} to convert an nested Parquet Record into Row.
*/
Expand Down Expand Up @@ -90,6 +92,7 @@ private static Converter createConverter(
} else if (typeInformation instanceof BasicArrayTypeInfo) {
Type elementType = field.asGroupType().getFields().get(0);
Class typeClass = ((BasicArrayTypeInfo) typeInformation).getComponentInfo().getTypeClass();

if (typeClass.equals(Character.class)) {
return new RowConverter.ArrayConverter<Character>(elementType,
Character.class, BasicTypeInfo.CHAR_TYPE_INFO, parentDataHolder, fieldPos);
Expand Down Expand Up @@ -131,7 +134,6 @@ private static Converter createConverter(
} else if (typeInformation instanceof ObjectArrayTypeInfo) {
GroupType parquetGroupType = field.asGroupType();
Type elementType = parquetGroupType.getType(0);

return new RowConverter.ArrayConverter<Row>(elementType, Row.class,
((ObjectArrayTypeInfo) typeInformation).getComponentInfo(), parentDataHolder, fieldPos);
} else if (typeInformation instanceof RowTypeInfo) {
Expand Down Expand Up @@ -302,11 +304,13 @@ static class ArrayConverter<T> extends GroupConverter implements ParentDataHolde
this.elementClass = elementClass;
this.parentDataHolder = parentDataHolder;
this.pos = pos;

if (elementClass.equals(Row.class)) {
this.elementConverter = createConverter(elementType, 0, elementTypeInfo, this);
if (isElementType(elementType, elementTypeInfo)) {
elementConverter = createArrayElementConverter(elementType, elementClass,
elementTypeInfo, this);
} else {
this.elementConverter = new RowConverter.RowPrimitiveConverter(elementType, this, 0);
GroupType groupType = elementType.asGroupType();
elementConverter = new ArrayElementConverter(groupType, elementClass,
elementTypeInfo, this);
}
}

Expand All @@ -329,6 +333,46 @@ public void end() {
public void add(int fieldIndex, Object object) {
list.add((T) object);
}

/**
* Converter for list elements.
*
* <pre>
* optional group the_list (LIST) {
* repeated group array { <-- this layer
* optional (type) element;
* }
* }
* </pre>
*/
static class ArrayElementConverter extends GroupConverter {
private final Converter elementConverter;

public ArrayElementConverter(
GroupType repeatedTye,
Class elementClass,
TypeInformation elementTypeInfo,
ParentDataHolder holder) {
Type elementType = repeatedTye.getType(0);
this.elementConverter = createArrayElementConverter(elementType, elementClass,
elementTypeInfo, holder);
}

@Override
public Converter getConverter(int i) {
return elementConverter;
}

@Override
public void start() {

}

@Override
public void end() {

}
}
}

static class MapConverter extends GroupConverter {
Expand Down Expand Up @@ -395,4 +439,38 @@ public void end() {
}
}
}

static Converter createArrayElementConverter(
Type elementType,
Class elementClass,
TypeInformation elementTypeInfo,
ParentDataHolder holder) {
if (elementClass.equals(Row.class)) {
return createConverter(elementType, 0, elementTypeInfo, holder);
} else {
return new RowConverter.RowPrimitiveConverter(elementType, holder, 0);
}
}

/**
* Returns whether the given type is the element type of a list or is a
* synthetic group with one field that is the element type. This is
* determined by checking whether the type can be a synthetic group and by
* checking whether a potential synthetic group matches the expected schema.
*
* @param repeatedType a type that may be the element type
* @param typeInformation the expected flink type for list elements
* @return {@code true} if the repeatedType is the element schema
*/
static boolean isElementType(Type repeatedType, TypeInformation typeInformation) {
if (repeatedType.isPrimitive() ||
repeatedType.asGroupType().getFieldCount() > 1 ||
repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) {
// The repeated type must be the element type because it is an invalid
// synthetic wrapper. Must be a group with one optional or required field
return true;
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -43,21 +45,27 @@
/**
* Test cases for reading Map from Parquet files.
*/
public class ParquetMapInputFormatTest {
@RunWith(Parameterized.class)
public class ParquetMapInputFormatTest extends TestUtil {
private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();

@ClassRule
public static TemporaryFolder tempRoot = new TemporaryFolder();

public ParquetMapInputFormatTest(boolean useLegacyMode) {
super(useLegacyMode);
}

@Test
@SuppressWarnings("unchecked")
public void testReadMapFromNestedRecord() throws IOException {
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> nested = TestUtil.getNestedRecordTestData();
Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
MessageType nestedType = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
Path path = createTempParquetFile(tempRoot.getRoot(), NESTED_SCHEMA,
Collections.singletonList(nested.f1), getConfiguration());
MessageType nestedType = getSchemaConverter().convert(NESTED_SCHEMA);

ParquetMapInputFormat inputFormat = new ParquetMapInputFormat(path, nestedType);
inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
inputFormat.setRuntimeContext(getMockRuntimeContext());

FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
Expand All @@ -84,12 +92,13 @@ public void testReadMapFromNestedRecord() throws IOException {
@SuppressWarnings("unchecked")
public void testProjectedReadMapFromNestedRecord() throws IOException {
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> nested = TestUtil.getNestedRecordTestData();
Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
MessageType nestedType = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
Path path = createTempParquetFile(tempRoot.getRoot(), NESTED_SCHEMA,
Collections.singletonList(nested.f1), getConfiguration());
MessageType nestedType = getSchemaConverter().convert(NESTED_SCHEMA);
ParquetMapInputFormat inputFormat = new ParquetMapInputFormat(path, nestedType);

inputFormat.selectFields(Collections.singletonList("nestedMap").toArray(new String[0]));
inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
inputFormat.setRuntimeContext(getMockRuntimeContext());

FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -43,17 +45,23 @@
/**
* Test cases for reading Pojo from Parquet files.
*/
public class ParquetPojoInputFormatTest {
@RunWith(Parameterized.class)
public class ParquetPojoInputFormatTest extends TestUtil {
private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();

@ClassRule
public static TemporaryFolder tempRoot = new TemporaryFolder();

public ParquetPojoInputFormatTest(boolean useLegacyMode) {
super(useLegacyMode);
}

@Test
public void testReadPojoFromSimpleRecord() throws IOException {
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> simple = TestUtil.getSimpleRecordTestData();
MessageType messageType = SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1));
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> simple = getSimpleRecordTestData();
MessageType messageType = getSchemaConverter().convert(SIMPLE_SCHEMA);
Path path = createTempParquetFile(tempRoot.getRoot(), SIMPLE_SCHEMA,
Collections.singletonList(simple.f1), getConfiguration());

ParquetPojoInputFormat<PojoSimpleRecord> inputFormat = new ParquetPojoInputFormat<>(
path, messageType, (PojoTypeInfo<PojoSimpleRecord>) Types.POJO(PojoSimpleRecord.class));
Expand All @@ -72,12 +80,13 @@ public void testReadPojoFromSimpleRecord() throws IOException {
@Test
public void testProjectedReadPojoFromSimpleRecord() throws IOException, NoSuchFieldError {
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> simple = TestUtil.getSimpleRecordTestData();
MessageType messageType = SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1));
MessageType messageType = getSchemaConverter().convert(SIMPLE_SCHEMA);
Path path = createTempParquetFile(tempRoot.getRoot(), SIMPLE_SCHEMA,
Collections.singletonList(simple.f1), getConfiguration());

ParquetPojoInputFormat<PojoSimpleRecord> inputFormat = new ParquetPojoInputFormat<>(
path, messageType, (PojoTypeInfo<PojoSimpleRecord>) Types.POJO(PojoSimpleRecord.class));
inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
inputFormat.setRuntimeContext(getMockRuntimeContext());

FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
Expand Down
Loading

0 comments on commit c77e6f9

Please sign in to comment.