Skip to content

Commit

Permalink
use TypeInfo to serialize to MetacatType instead of ObjectInspector (#…
Browse files Browse the repository at this point in the history
…585) (#592)

Co-authored-by: Yingjian Wu <[email protected]>
  • Loading branch information
stevie9868 and Yingjian Wu committed May 13, 2024
1 parent 73a37fe commit c54dbda
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,16 @@
import com.netflix.metacat.common.type.VarcharType;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -78,24 +74,20 @@ public class HiveTypeConverter implements ConnectorTypeConverter {
private static final Pattern DECIMAL_TYPE
= Pattern.compile(DECIMAL_WITH_SCALE + "|" + DECIMAL_WITH_SCALE_AND_PRECISION, Pattern.CASE_INSENSITIVE);

private static Type getPrimitiveType(final ObjectInspector fieldInspector) {
final PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) fieldInspector)
.getPrimitiveCategory();
private static Type getPrimitiveType(final TypeInfo typeInfo) {
final PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
if (HiveTypeMapping.getHIVE_TO_CANONICAL().containsKey(primitiveCategory.name())) {
return HiveTypeMapping.getHIVE_TO_CANONICAL().get(primitiveCategory.name());
}
switch (primitiveCategory) {
case DECIMAL:
final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) ((PrimitiveObjectInspector) fieldInspector)
.getTypeInfo();
final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
return DecimalType.createDecimalType(decimalTypeInfo.precision(), decimalTypeInfo.getScale());
case CHAR:
final int cLength = ((CharTypeInfo) ((PrimitiveObjectInspector)
fieldInspector).getTypeInfo()).getLength();
final int cLength = ((CharTypeInfo) typeInfo).getLength();
return CharType.createCharType(cLength);
case VARCHAR:
final int vLength = ((VarcharTypeInfo) ((PrimitiveObjectInspector) fieldInspector)
.getTypeInfo()).getLength();
final int vLength = ((VarcharTypeInfo) typeInfo).getLength();
return VarcharType.createVarcharType(vLength);
default:
return null;
Expand All @@ -106,17 +98,7 @@ private static Type getPrimitiveType(final ObjectInspector fieldInspector) {
public Type toMetacatType(final String type) {
// Hack to fix presto "varchar" type coming in with no length which is required by Hive.
final TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(sanitizeType(type));
ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo);
// The standard struct object inspector forces field names to lower case, however in Metacat we need to preserve
// the original case of the struct fields so we wrap it with our wrapper to force the fieldNames to keep
// their original case
if (typeInfo.getCategory().equals(ObjectInspector.Category.STRUCT)) {
final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
final StandardStructObjectInspector objectInspector = (StandardStructObjectInspector) oi;
oi = new HiveTypeConverter.SameCaseStandardStructObjectInspector(
structTypeInfo.getAllStructFieldNames(), objectInspector);
}
return getCanonicalType(oi);
return getCanonicalType(typeInfo);
}

/**
Expand Down Expand Up @@ -305,43 +287,48 @@ public static String sanitizeType(final String type) {
/**
* Returns the canonical type.
*
* @param fieldInspector inspector
* @return type
* @param typeInfo typeInfo
* @return Metacat Type
*/
Type getCanonicalType(final ObjectInspector fieldInspector) {
switch (fieldInspector.getCategory()) {
Type getCanonicalType(final TypeInfo typeInfo) {
switch (typeInfo.getCategory()) {
case PRIMITIVE:
return getPrimitiveType(fieldInspector);
return getPrimitiveType(typeInfo);
case MAP:
final MapObjectInspector mapObjectInspector =
TypeUtils.checkType(fieldInspector, MapObjectInspector.class,
"fieldInspector");
final Type keyType = getCanonicalType(mapObjectInspector.getMapKeyObjectInspector());
final Type valueType = getCanonicalType(mapObjectInspector.getMapValueObjectInspector());
final MapTypeInfo mapTypeInfo =
TypeUtils.checkType(typeInfo, MapTypeInfo.class, "typeInfo");
final Type keyType = getCanonicalType(mapTypeInfo.getMapKeyTypeInfo());
final Type valueType = getCanonicalType(mapTypeInfo.getMapValueTypeInfo());
if (keyType == null || valueType == null) {
return null;
}
return TypeRegistry.getTypeRegistry().getParameterizedType(TypeEnum.MAP,
ImmutableList.of(keyType.getTypeSignature(), valueType.getTypeSignature()), ImmutableList.of());
case LIST:
final ListObjectInspector listObjectInspector =
TypeUtils.checkType(fieldInspector, ListObjectInspector.class,
"fieldInspector");
final ListTypeInfo listTypeInfo =
TypeUtils.checkType(typeInfo, ListTypeInfo.class, "typeInfo");
final Type elementType =
getCanonicalType(listObjectInspector.getListElementObjectInspector());
getCanonicalType(listTypeInfo.getListElementTypeInfo());
if (elementType == null) {
return null;
}
return TypeRegistry.getTypeRegistry().getParameterizedType(TypeEnum.ARRAY,
ImmutableList.of(elementType.getTypeSignature()), ImmutableList.of());
case STRUCT:
final StructObjectInspector structObjectInspector =
TypeUtils.checkType(fieldInspector, StructObjectInspector.class, "fieldInspector");
final List<TypeSignature> fieldTypes = new ArrayList<>();
final List<Object> fieldNames = new ArrayList<>();
for (StructField field : structObjectInspector.getAllStructFieldRefs()) {
fieldNames.add(field.getFieldName());
final Type fieldType = getCanonicalType(field.getFieldObjectInspector());
final StructTypeInfo structTypeInfo =
TypeUtils.checkType(typeInfo, StructTypeInfo.class, "typeInfo");
// Hive struct type infos
final List<String> structFieldNames = structTypeInfo.getAllStructFieldNames();
final List<TypeInfo> structFieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
final int structInfoCounts = structFieldNames.size();

// Metacat canonical type infos
final List<TypeSignature> fieldTypes = new ArrayList<>(structInfoCounts);
final List<Object> fieldNames = new ArrayList<>(structInfoCounts);

for (int i = 0; i < structInfoCounts; i++) {
fieldNames.add(structFieldNames.get(i));
final Type fieldType = getCanonicalType(structFieldTypeInfos.get(i));
if (fieldType == null) {
return null;
}
Expand All @@ -350,42 +337,8 @@ Type getCanonicalType(final ObjectInspector fieldInspector) {
return TypeRegistry.getTypeRegistry()
.getParameterizedType(TypeEnum.ROW, fieldTypes, fieldNames);
default:
log.info("Currently unsupported type {}, returning Unknown type", fieldInspector.getTypeName());
log.info("Currently unsupported type {}, returning Unknown type", typeInfo.getTypeName());
return BaseType.UNKNOWN;
}
}

// This is protected and extends StandardStructObjectInspector so it can reference MyField
protected static class SameCaseStandardStructObjectInspector extends StandardStructObjectInspector {
private final List<String> realFieldNames;
private final StandardStructObjectInspector structObjectInspector;

public SameCaseStandardStructObjectInspector(final List<String> realFieldNames,
final StandardStructObjectInspector structObjectInspector) {
this.realFieldNames = realFieldNames;
this.structObjectInspector = structObjectInspector;
}

@Override
public List<? extends StructField> getAllStructFieldRefs() {
return structObjectInspector.getAllStructFieldRefs()
.stream()
.map(structField -> (MyField) structField)
.map(field -> new HiveTypeConverter.
SameCaseStandardStructObjectInspector.SameCaseMyField(field.getFieldID(),
realFieldNames.get(field.getFieldID()),
field.getFieldObjectInspector(), field.getFieldComment()))
.collect(Collectors.toList());
}

protected static class SameCaseMyField extends MyField {
public SameCaseMyField(final int fieldID, final String fieldName,
final ObjectInspector fieldObjectInspector,
final String fieldComment) {
super(fieldID, fieldName, fieldObjectInspector, fieldComment);
// Since super lower cases fieldName, this is to restore the original case
this.fieldName = fieldName;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ class HiveTypeConverterSpec extends Specification {
"struct<prediction_date:int,lower_confidence_amt:double,upper_confidence_amt:double,model_short_name:string>",
"struct<prediction_date:int,lower_confidence_amt:int,upper_confidence_amt:int,model_short_name:string>",
"struct<prediction_date:int,prediction_source:string>",

// Nested Type with UpperCase
'array<struct<date:string,countryCodes:array<string>,source:string>>',
"struct<Field3:struct<Nested_Field1:bigint,Nested_Field2:bigint>>",
"struct<Field1:bigint,Field2:bigint,field3:struct<NESTED_Field1:bigint,NesteD_Field2:bigint>>"
]
}

Expand Down Expand Up @@ -218,6 +223,10 @@ class HiveTypeConverterSpec extends Specification {
"struct<prediction_date:int,lower_confidence_amt:double,upper_confidence_amt:double,model_short_name:string>" || "struct<prediction_date:int,lower_confidence_amt:double,upper_confidence_amt:double,model_short_name:string>"
"struct<prediction_date:int,lower_confidence_amt:int,upper_confidence_amt:int,model_short_name:string>" || "struct<prediction_date:int,lower_confidence_amt:int,upper_confidence_amt:int,model_short_name:string>"
"struct<prediction_date:int,prediction_source:string>" || "struct<prediction_date:int,prediction_source:string>"

'array<struct<field2:decimal(38 ),countryCodes:array<string>,source:string>>' || 'array<struct<field2:decimal(38),countryCodes:array<string>,source:string>>'
"struct<Field3:struct<Nested_Field1:bigint,Nested_Field2:bigint,Nested_FIELD3:decimal( 38, 9)>>" || "struct<Field3:struct<Nested_Field1:bigint,Nested_Field2:bigint,Nested_FIELD3:decimal(38,9)>>"
"struct<Field1:decimal (38,9 ),Field2:bigint,field3:struct<NESTED_Field1:decimal ( 38,9 ),NesteD_Field2:bigint>>" || "struct<Field1:decimal(38,9),Field2:bigint,field3:struct<NESTED_Field1:decimal(38,9),NesteD_Field2:bigint>>"
}

@Unroll
Expand All @@ -233,4 +242,17 @@ class HiveTypeConverterSpec extends Specification {
]
}

@Unroll
def 'case reserve fieldName Fidelity'(String typeString, String expectedString) {
expect:
def result = converter.fromMetacatTypeToJson(converter.toMetacatType(typeString)).toString()

assert result == expectedString

where:
typeString | expectedString
"struct<Field1:bigint,Field2:bigint,field3:struct<nested_Field1:bigint,nested_Field2:bigint>>" | """{"type":"row","fields":[{"name":"Field1","type":"bigint"},{"name":"Field2","type":"bigint"},{"name":"field3","type":{"type":"row","fields":[{"name":"nested_Field1","type":"bigint"},{"name":"nested_Field2","type":"bigint"}]}}]}"""
"array<struct<date:string,countryCodes:array<string>,source:string>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}"""
"array<struct<Date:string,nestedArray:array<struct<date:string,countryCodes:array<string>,source:string>>>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"Date","type":"string"},{"name":"nestedArray","type":{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}}]}}"""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
{
"format-version" : 1,
"table-uuid" : "6d9ede8f-61cb-469e-a590-4757602df691",
"location" : "file:/tmp/data",
"last-updated-ms" : 1712701649817,
"last-column-id" : 9,
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "dateint",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "info",
"required" : false,
"type" : {
"type" : "struct",
"fields" : [ {
"id" : 3,
"name" : "name",
"required" : false,
"type" : "string"
}, {
"id" : 4,
"name" : "address",
"required" : false,
"type" : {
"type" : "struct",
"fields" : [ {
"id" : 6,
"name" : "NAME",
"required" : false,
"type" : "string"
} ]
}
}, {
"id" : 5,
"name" : "nestedArray",
"required" : false,
"type" : {
"type" : "list",
"element-id" : 7,
"element" : {
"type" : "struct",
"fields" : [ {
"id" : 8,
"name" : "FIELD1",
"required" : false,
"type" : "string"
}, {
"id" : 9,
"name" : "field2",
"required" : false,
"type" : "string"
} ]
},
"element-required" : false
}
} ]
}
} ]
},
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "dateint",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "info",
"required" : false,
"type" : {
"type" : "struct",
"fields" : [ {
"id" : 3,
"name" : "name",
"required" : false,
"type" : "string"
}, {
"id" : 4,
"name" : "address",
"required" : false,
"type" : {
"type" : "struct",
"fields" : [ {
"id" : 6,
"name" : "NAME",
"required" : false,
"type" : "string"
} ]
}
}, {
"id" : 5,
"name" : "nestedArray",
"required" : false,
"type" : {
"type" : "list",
"element-id" : 7,
"element" : {
"type" : "struct",
"fields" : [ {
"id" : 8,
"name" : "FIELD1",
"required" : false,
"type" : "string"
}, {
"id" : 9,
"name" : "field2",
"required" : false,
"type" : "string"
} ]
},
"element-required" : false
}
} ]
}
} ]
} ],
"partition-spec" : [ {
"name" : "dateint",
"transform" : "identity",
"source-id" : 1,
"field-id" : 1000
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "dateint",
"transform" : "identity",
"source-id" : 1,
"field-id" : 1000
} ]
} ],
"last-partition-id" : 1000,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"field.metadata.json" : "{\"1\":{},\"2\":{},\"3\":{},\"4\":{},\"5\":{},\"6\":{},\"8\":{},\"9\":{}}"
},
"current-snapshot-id" : -1,
"refs" : { },
"snapshots" : [ ],
"statistics" : [ ],
"snapshot-log" : [ ],
"metadata-log" : [ ]
}
Loading

0 comments on commit c54dbda

Please sign in to comment.