From c54dbdad5cc969d0838b918fb374eefe1dcf0d01 Mon Sep 17 00:00:00 2001 From: stevie9868 <151791653+stevie9868@users.noreply.github.com> Date: Mon, 13 May 2024 13:03:10 -0700 Subject: [PATCH] use TypeInfo to serialize to MetacatType instead of ObjectInspector (#585) (#592) Co-authored-by: Yingjian Wu --- .../hive/converters/HiveTypeConverter.java | 127 +++++--------- .../converters/HiveTypeConverterSpec.groovy | 22 +++ ...-438f-413e-96c2-2694d7926529.metadata.json | 157 ++++++++++++++++++ .../netflix/metacat/MetacatSmokeSpec.groovy | 116 ++++++++++++- 4 files changed, 333 insertions(+), 89 deletions(-) create mode 100644 metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00000-0b60cc39-438f-413e-96c2-2694d7926529.metadata.json diff --git a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveTypeConverter.java b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveTypeConverter.java index ffdb53e90..625a0990e 100644 --- a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveTypeConverter.java +++ b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveTypeConverter.java @@ -31,20 +31,16 @@ import com.netflix.metacat.common.type.VarcharType; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.iceberg.PartitionField; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; @@ -78,24 +74,20 @@ public class HiveTypeConverter implements ConnectorTypeConverter { private static final Pattern DECIMAL_TYPE = Pattern.compile(DECIMAL_WITH_SCALE + "|" + DECIMAL_WITH_SCALE_AND_PRECISION, Pattern.CASE_INSENSITIVE); - private static Type getPrimitiveType(final ObjectInspector fieldInspector) { - final PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) fieldInspector) - .getPrimitiveCategory(); + private static Type getPrimitiveType(final TypeInfo typeInfo) { + final PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(); if (HiveTypeMapping.getHIVE_TO_CANONICAL().containsKey(primitiveCategory.name())) { return HiveTypeMapping.getHIVE_TO_CANONICAL().get(primitiveCategory.name()); } switch (primitiveCategory) { case DECIMAL: - final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) ((PrimitiveObjectInspector) fieldInspector) - .getTypeInfo(); + final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; return DecimalType.createDecimalType(decimalTypeInfo.precision(), decimalTypeInfo.getScale()); case CHAR: - final int cLength = ((CharTypeInfo) ((PrimitiveObjectInspector) - fieldInspector).getTypeInfo()).getLength(); + final int cLength = ((CharTypeInfo) typeInfo).getLength(); return CharType.createCharType(cLength); case VARCHAR: - final int vLength = ((VarcharTypeInfo) ((PrimitiveObjectInspector) fieldInspector) - .getTypeInfo()).getLength(); + final int vLength = ((VarcharTypeInfo) typeInfo).getLength(); return VarcharType.createVarcharType(vLength); default: return null; @@ -106,17 +98,7 @@ private static Type getPrimitiveType(final ObjectInspector fieldInspector) { public Type toMetacatType(final String type) { // Hack to fix presto "varchar" type coming in with no length which is required by Hive. final TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(sanitizeType(type)); - ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo); - // The standard struct object inspector forces field names to lower case, however in Metacat we need to preserve - // the original case of the struct fields so we wrap it with our wrapper to force the fieldNames to keep - // their original case - if (typeInfo.getCategory().equals(ObjectInspector.Category.STRUCT)) { - final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; - final StandardStructObjectInspector objectInspector = (StandardStructObjectInspector) oi; - oi = new HiveTypeConverter.SameCaseStandardStructObjectInspector( - structTypeInfo.getAllStructFieldNames(), objectInspector); - } - return getCanonicalType(oi); + return getCanonicalType(typeInfo); } /** @@ -305,43 +287,48 @@ public static String sanitizeType(final String type) { /** * Returns the canonical type. * - * @param fieldInspector inspector - * @return type + * @param typeInfo typeInfo + * @return Metacat Type */ - Type getCanonicalType(final ObjectInspector fieldInspector) { - switch (fieldInspector.getCategory()) { + Type getCanonicalType(final TypeInfo typeInfo) { + switch (typeInfo.getCategory()) { case PRIMITIVE: - return getPrimitiveType(fieldInspector); + return getPrimitiveType(typeInfo); case MAP: - final MapObjectInspector mapObjectInspector = - TypeUtils.checkType(fieldInspector, MapObjectInspector.class, - "fieldInspector"); - final Type keyType = getCanonicalType(mapObjectInspector.getMapKeyObjectInspector()); - final Type valueType = getCanonicalType(mapObjectInspector.getMapValueObjectInspector()); + final MapTypeInfo mapTypeInfo = + TypeUtils.checkType(typeInfo, MapTypeInfo.class, "typeInfo"); + final Type keyType = getCanonicalType(mapTypeInfo.getMapKeyTypeInfo()); + final Type valueType = getCanonicalType(mapTypeInfo.getMapValueTypeInfo()); if (keyType == null || valueType == null) { return null; } return TypeRegistry.getTypeRegistry().getParameterizedType(TypeEnum.MAP, ImmutableList.of(keyType.getTypeSignature(), valueType.getTypeSignature()), ImmutableList.of()); case LIST: - final ListObjectInspector listObjectInspector = - TypeUtils.checkType(fieldInspector, ListObjectInspector.class, - "fieldInspector"); + final ListTypeInfo listTypeInfo = + TypeUtils.checkType(typeInfo, ListTypeInfo.class, "typeInfo"); final Type elementType = - getCanonicalType(listObjectInspector.getListElementObjectInspector()); + getCanonicalType(listTypeInfo.getListElementTypeInfo()); if (elementType == null) { return null; } return TypeRegistry.getTypeRegistry().getParameterizedType(TypeEnum.ARRAY, ImmutableList.of(elementType.getTypeSignature()), ImmutableList.of()); case STRUCT: - final StructObjectInspector structObjectInspector = - TypeUtils.checkType(fieldInspector, StructObjectInspector.class, "fieldInspector"); - final List fieldTypes = new ArrayList<>(); - final List fieldNames = new ArrayList<>(); - for (StructField field : structObjectInspector.getAllStructFieldRefs()) { - fieldNames.add(field.getFieldName()); - final Type fieldType = getCanonicalType(field.getFieldObjectInspector()); + final StructTypeInfo structTypeInfo = + TypeUtils.checkType(typeInfo, StructTypeInfo.class, "typeInfo"); + // Hive struct type infos + final List structFieldNames = structTypeInfo.getAllStructFieldNames(); + final List structFieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + final int structInfoCounts = structFieldNames.size(); + + // Metacat canonical type infos + final List fieldTypes = new ArrayList<>(structInfoCounts); + final List fieldNames = new ArrayList<>(structInfoCounts); + + for (int i = 0; i < structInfoCounts; i++) { + fieldNames.add(structFieldNames.get(i)); + final Type fieldType = getCanonicalType(structFieldTypeInfos.get(i)); if (fieldType == null) { return null; } @@ -350,42 +337,8 @@ Type getCanonicalType(final ObjectInspector fieldInspector) { return TypeRegistry.getTypeRegistry() .getParameterizedType(TypeEnum.ROW, fieldTypes, fieldNames); default: - log.info("Currently unsupported type {}, returning Unknown type", fieldInspector.getTypeName()); + log.info("Currently unsupported type {}, returning Unknown type", typeInfo.getTypeName()); return BaseType.UNKNOWN; } } - - // This is protected and extends StandardStructObjectInspector so it can reference MyField - protected static class SameCaseStandardStructObjectInspector extends StandardStructObjectInspector { - private final List realFieldNames; - private final StandardStructObjectInspector structObjectInspector; - - public SameCaseStandardStructObjectInspector(final List realFieldNames, - final StandardStructObjectInspector structObjectInspector) { - this.realFieldNames = realFieldNames; - this.structObjectInspector = structObjectInspector; - } - - @Override - public List getAllStructFieldRefs() { - return structObjectInspector.getAllStructFieldRefs() - .stream() - .map(structField -> (MyField) structField) - .map(field -> new HiveTypeConverter. - SameCaseStandardStructObjectInspector.SameCaseMyField(field.getFieldID(), - realFieldNames.get(field.getFieldID()), - field.getFieldObjectInspector(), field.getFieldComment())) - .collect(Collectors.toList()); - } - - protected static class SameCaseMyField extends MyField { - public SameCaseMyField(final int fieldID, final String fieldName, - final ObjectInspector fieldObjectInspector, - final String fieldComment) { - super(fieldID, fieldName, fieldObjectInspector, fieldComment); - // Since super lower cases fieldName, this is to restore the original case - this.fieldName = fieldName; - } - } - } } diff --git a/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveTypeConverterSpec.groovy b/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveTypeConverterSpec.groovy index 2414461aa..9a8282c3d 100644 --- a/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveTypeConverterSpec.groovy +++ b/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveTypeConverterSpec.groovy @@ -119,6 +119,11 @@ class HiveTypeConverterSpec extends Specification { "struct", "struct", "struct", + + // Nested Type with UpperCase + 'array,source:string>>', + "struct>", + "struct>" ] } @@ -218,6 +223,10 @@ class HiveTypeConverterSpec extends Specification { "struct" || "struct" "struct" || "struct" "struct" || "struct" + + 'array,source:string>>' || 'array,source:string>>' + "struct>" || "struct>" + "struct>" || "struct>" } @Unroll @@ -233,4 +242,17 @@ class HiveTypeConverterSpec extends Specification { ] } + @Unroll + def 'case reserve fieldName Fidelity'(String typeString, String expectedString) { + expect: + def result = converter.fromMetacatTypeToJson(converter.toMetacatType(typeString)).toString() + + assert result == expectedString + + where: + typeString | expectedString + "struct>" | """{"type":"row","fields":[{"name":"Field1","type":"bigint"},{"name":"Field2","type":"bigint"},{"name":"field3","type":{"type":"row","fields":[{"name":"nested_Field1","type":"bigint"},{"name":"nested_Field2","type":"bigint"}]}}]}""" + "array,source:string>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}""" + "array,source:string>>>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"Date","type":"string"},{"name":"nestedArray","type":{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}}]}}""" + } } diff --git a/metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00000-0b60cc39-438f-413e-96c2-2694d7926529.metadata.json b/metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00000-0b60cc39-438f-413e-96c2-2694d7926529.metadata.json new file mode 100644 index 000000000..eb3b83770 --- /dev/null +++ b/metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00000-0b60cc39-438f-413e-96c2-2694d7926529.metadata.json @@ -0,0 +1,157 @@ +{ + "format-version" : 1, + "table-uuid" : "6d9ede8f-61cb-469e-a590-4757602df691", + "location" : "file:/tmp/data", + "last-updated-ms" : 1712701649817, + "last-column-id" : 9, + "schema" : { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "dateint", + "required" : false, + "type" : "long" + }, { + "id" : 2, + "name" : "info", + "required" : false, + "type" : { + "type" : "struct", + "fields" : [ { + "id" : 3, + "name" : "name", + "required" : false, + "type" : "string" + }, { + "id" : 4, + "name" : "address", + "required" : false, + "type" : { + "type" : "struct", + "fields" : [ { + "id" : 6, + "name" : "NAME", + "required" : false, + "type" : "string" + } ] + } + }, { + "id" : 5, + "name" : "nestedArray", + "required" : false, + "type" : { + "type" : "list", + "element-id" : 7, + "element" : { + "type" : "struct", + "fields" : [ { + "id" : 8, + "name" : "FIELD1", + "required" : false, + "type" : "string" + }, { + "id" : 9, + "name" : "field2", + "required" : false, + "type" : "string" + } ] + }, + "element-required" : false + } + } ] + } + } ] + }, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "dateint", + "required" : false, + "type" : "long" + }, { + "id" : 2, + "name" : "info", + "required" : false, + "type" : { + "type" : "struct", + "fields" : [ { + "id" : 3, + "name" : "name", + "required" : false, + "type" : "string" + }, { + "id" : 4, + "name" : "address", + "required" : false, + "type" : { + "type" : "struct", + "fields" : [ { + "id" : 6, + "name" : "NAME", + "required" : false, + "type" : "string" + } ] + } + }, { + "id" : 5, + "name" : "nestedArray", + "required" : false, + "type" : { + "type" : "list", + "element-id" : 7, + "element" : { + "type" : "struct", + "fields" : [ { + "id" : 8, + "name" : "FIELD1", + "required" : false, + "type" : "string" + }, { + "id" : 9, + "name" : "field2", + "required" : false, + "type" : "string" + } ] + }, + "element-required" : false + } + } ] + } + } ] + } ], + "partition-spec" : [ { + "name" : "dateint", + "transform" : "identity", + "source-id" : 1, + "field-id" : 1000 + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ { + "name" : "dateint", + "transform" : "identity", + "source-id" : 1, + "field-id" : 1000 + } ] + } ], + "last-partition-id" : 1000, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "field.metadata.json" : "{\"1\":{},\"2\":{},\"3\":{},\"4\":{},\"5\":{},\"6\":{},\"8\":{},\"9\":{}}" + }, + "current-snapshot-id" : -1, + "refs" : { }, + "snapshots" : [ ], + "statistics" : [ ], + "snapshot-log" : [ ], + "metadata-log" : [ ] +} diff --git a/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatSmokeSpec.groovy b/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatSmokeSpec.groovy index efd19489f..697d8d647 100644 --- a/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatSmokeSpec.groovy +++ b/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatSmokeSpec.groovy @@ -27,12 +27,10 @@ import com.netflix.metacat.common.exception.MetacatBadRequestException import com.netflix.metacat.common.exception.MetacatNotFoundException import com.netflix.metacat.common.exception.MetacatNotSupportedException import com.netflix.metacat.common.exception.MetacatPreconditionFailedException -import com.netflix.metacat.common.exception.MetacatTooManyRequestsException import com.netflix.metacat.common.exception.MetacatUnAuthorizedException import com.netflix.metacat.common.json.MetacatJson import com.netflix.metacat.common.json.MetacatJsonLocator import com.netflix.metacat.common.server.connectors.exception.InvalidMetaException -import com.netflix.metacat.common.server.connectors.exception.TableMigrationInProgressException import com.netflix.metacat.connector.hive.util.PartitionUtil import com.netflix.metacat.testdata.provider.PigDataDtoProvider import feign.Logger @@ -41,6 +39,7 @@ import feign.Retryer import groovy.sql.Sql import org.apache.commons.io.FileUtils import org.joda.time.Instant +import org.skyscreamer.jsonassert.JSONAssert import spock.lang.Ignore import spock.lang.Shared import spock.lang.Specification @@ -166,6 +165,119 @@ class MetacatSmokeSpec extends Specification { thrown(MetacatNotSupportedException) } + @Unroll + def "Test create/get table with nested fields with upper case"() { + given: + def catalogName = 'embedded-fast-hive-metastore' + def databaseName = 'iceberg_db' + def tableName = 'iceberg_table_with_upper_case_nested_fields' + def uri = isLocalEnv ? String.format('file:/tmp/data/') : null + def tableDto = new TableDto( + name: QualifiedName.ofTable(catalogName, databaseName, tableName), + serde: new StorageDto( + owner: 'metacat-test', + inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', + outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + serializationLib: 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', + parameters: [ + 'serialization.format': '1' + ], + uri: uri + ), + definitionMetadata: null, + dataMetadata: null, + fields: [ + new FieldDto( + comment: null, + name: "dateint", + pos: 0, + type: "long", + partition_key: true, + ), + new FieldDto( + comment: null, + name: "info", + pos: 1, + partition_key: false, + type: "(name: chararray, address: (NAME: chararray), nestedArray: {(FIELD1: chararray, field2: chararray)})", + ) + ] + ) + + def metadataLocation = String.format('/tmp/data/metadata/00000-0b60cc39-438f-413e-96c2-2694d7926529.metadata.json') + + if (isIcebergTable) { + def metadata = [table_type: 'ICEBERG', metadata_location: metadataLocation] + tableDto.setMetadata(metadata) + } + when: + try {api.createDatabase(catalogName, databaseName, new DatabaseCreateRequestDto()) + } catch (Exception ignored) { + } + api.createTable(catalogName, databaseName, tableName, tableDto) + def tableDTO = api.getTable(catalogName, databaseName, tableName, true, true, true) + + then: + noExceptionThrown() + if (isIcebergTable) { + tableDTO.metadata.get("metadata_location").equals(metadataLocation) + } + tableDTO.getFields().size() == 2 + def nestedFieldDto = tableDTO.getFields().find { it.name == "info" } + // assert that the type field also keeps the name fidelity + assert nestedFieldDto.type == "(name: chararray,address: (NAME: chararray),nestedArray: {(FIELD1: chararray,field2: chararray)})" : "The type differ from the expected. They are: $nestedFieldDto.type" + + // assert that the json representation keeps the name fidelity + def expectedJsonString = """ + { + "type": "row", + "fields": [ + { + "name": "name", + "type": "chararray" + }, + { + "name": "address", + "type": { + "type": "row", + "fields": [ + { + "name": "NAME", + "type": "chararray" + } + ] + } + }, + { + "name": "nestedArray", + "type": { + "type": "array", + "elementType": { + "type": "row", + "fields": [ + { + "name": "FIELD1", + "type": "chararray" + }, + { + "name": "field2", + "type": "chararray" + } + ] + } + } + } + ] + } + """ + + JSONAssert.assertEquals(nestedFieldDto.jsonType.toString(), expectedJsonString, false) + cleanup: + api.deleteTable(catalogName, databaseName, tableName) + where: + isIcebergTable << [true, false] + } + @Unroll def "Test create database for #catalogName/#databaseName"() { given: