diff --git a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergRequestMetrics.java b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergRequestMetrics.java index ae44d360c..65e786d59 100644 --- a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergRequestMetrics.java +++ b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergRequestMetrics.java @@ -52,7 +52,7 @@ enum Type { private final String metricName; IcebergRequestMetrics(final IcebergRequestMetrics.Type type, final String measure) { - this.metricName = String.format("metacat.iceberg.%s.%s.%s", type.name(), type.name(), measure); + this.metricName = String.format("metacat.iceberg.%s.%s", type.name(), measure); } IcebergRequestMetrics(final String name) { diff --git a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergTableHandler.java b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergTableHandler.java index 5d941e046..f2b2961d8 100644 --- a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergTableHandler.java +++ b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergTableHandler.java @@ -127,15 +127,18 @@ public Map getIcebergTablePartitionMap( * @return iceberg table */ public Table getIcebergTable(final QualifiedName tableName, final String tableMetadataLocation) { - this.icebergTableCriteria.checkCriteria(tableName, tableMetadataLocation); final long start = this.registry.clock().wallTime(); - log.debug("Loading icebergTable {} from {}", tableName, tableMetadataLocation); - final Table table = new IcebergMetastoreTables(tableMetadataLocation).load(tableName.toString()); - final long duration = registry.clock().wallTime() - start; - log.info("Time taken to getIcebergTable {} is {} ms", tableName, duration); - this.icebergTableRequestMetrics.recordTimer(IcebergRequestMetrics.TagLoadTable.getMetricName(), duration); - this.icebergTableRequestMetrics.increaseCounter(IcebergRequestMetrics.TagLoadTable.getMetricName(), tableName); - return table; + try { + this.icebergTableCriteria.checkCriteria(tableName, tableMetadataLocation); + log.debug("Loading icebergTable {} from {}", tableName, tableMetadataLocation); + return new IcebergMetastoreTables(tableMetadataLocation).load(tableName.toString()); + } finally { + final long duration = registry.clock().wallTime() - start; + log.info("Time taken to getIcebergTable {} is {} ms", tableName, duration); + this.icebergTableRequestMetrics.recordTimer(IcebergRequestMetrics.TagLoadTable.getMetricName(), duration); + this.icebergTableRequestMetrics.increaseCounter( + IcebergRequestMetrics.TagLoadTable.getMetricName(), tableName); + } } /** diff --git a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/monitoring/HiveMetrics.java b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/monitoring/HiveMetrics.java index 9468362a5..2ba064229 100644 --- a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/monitoring/HiveMetrics.java +++ b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/monitoring/HiveMetrics.java @@ -84,7 +84,7 @@ enum Type { private final String metricName; HiveMetrics(final Type type, final String measure) { - this.metricName = "metacat.hive." + type.name() + "." + type.name() + "." + measure; + this.metricName = String.format("metacat.hive.%s.%s", type.name(), measure); } HiveMetrics(final String name) { this.metricName = name; diff --git a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/sql/HiveConnectorFastPartitionService.java b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/sql/HiveConnectorFastPartitionService.java index 746316650..490b00d19 100644 --- a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/sql/HiveConnectorFastPartitionService.java +++ b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/sql/HiveConnectorFastPartitionService.java @@ -25,6 +25,7 @@ import com.netflix.metacat.common.server.connectors.ConnectorRequestContext; import com.netflix.metacat.common.server.connectors.ConnectorUtils; import com.netflix.metacat.common.server.connectors.exception.InvalidMetaException; +import com.netflix.metacat.common.server.connectors.model.AuditInfo; import com.netflix.metacat.common.server.connectors.model.PartitionInfo; import com.netflix.metacat.common.server.connectors.model.PartitionListRequest; import com.netflix.metacat.common.server.connectors.model.StorageInfo; @@ -47,7 +48,10 @@ import org.apache.hadoop.hive.metastore.api.Table; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.time.Instant; import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Set; @@ -358,11 +362,11 @@ private List getIcebergPartitionInfos( final Map partitionMap = icebergTableHandler.getIcebergTablePartitionMap(tableName, partitionsRequest, icebergTable); - final List filteredPartitionList; final List partitionIds = partitionsRequest.getPartitionNames(); final Sort sort = partitionsRequest.getSort(); + final AuditInfo tableAuditInfo = tableInfo.getAudit(); - filteredPartitionList = partitionMap.keySet().stream() + final List filteredPartitionList = partitionMap.keySet().stream() .filter(partitionName -> partitionIds == null || partitionIds.contains(partitionName)) .map(partitionName -> PartitionInfo.builder().name( QualifiedName.ofPartition(tableName.getCatalogName(), @@ -370,8 +374,12 @@ private List getIcebergPartitionInfos( tableName.getTableName(), partitionName)) .dataMetrics(icebergTableHandler.getDataMetadataFromIcebergMetrics(partitionMap.get(partitionName))) - .auditInfo(tableInfo.getAudit()).build()) + .auditInfo(AuditInfo.builder().createdBy(tableAuditInfo.getCreatedBy()) + .createdDate(fromEpochMilliToDate(partitionMap.get(partitionName).dataTimestampMillis())) + .lastModifiedDate(fromEpochMilliToDate(partitionMap.get(partitionName).dataTimestampMillis())) + .build()).build()) .collect(Collectors.toList()); + if (sort != null) { //it can only support sortBy partition Name final Comparator nameComparator = Comparator.comparing(p -> p.getName().toString()); @@ -380,5 +388,7 @@ private List getIcebergPartitionInfos( return ConnectorUtils.paginate(filteredPartitionList, pageable); } - + private Date fromEpochMilliToDate(@Nullable final Long l) { + return (l == null) ? null : Date.from(Instant.ofEpochMilli(l)); + } } diff --git a/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/HiveConnectorFastPartitionSpec.groovy b/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/HiveConnectorFastPartitionSpec.groovy index df7a8839c..dfdde0cc3 100644 --- a/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/HiveConnectorFastPartitionSpec.groovy +++ b/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/HiveConnectorFastPartitionSpec.groovy @@ -69,11 +69,14 @@ class HiveConnectorFastPartitionSpec extends Specification { def setupSpec() { conf.icebergEnabled >> true + metric1.fileCount() >> 1 + metric1.dataTimestampMillis() >> 1234500000 + metric1.recordCount() >> 1 hiveConnectorFastPartitionService.icebergTableHandler = icebergTableHandler icebergTableHandler.getIcebergTable(_,_) >> Mock(Table) icebergTableHandler.getIcebergTablePartitionMap(_,_,_) >> ["dateint=20170101/hour=1": metric1, - "dateint=20170102/hour=1": metric1, - "dateint=20170103/hour=1": metric1] + "dateint=20170102/hour=1": metric1, + "dateint=20170103/hour=1": metric1] } @Unroll @@ -84,12 +87,18 @@ class HiveConnectorFastPartitionSpec extends Specification { partitionListRequest, MetacatDataInfoProvider.getIcebergTableInfo("icebergtable")) then: - partionInfos.collect { it.getName().getPartitionName() }.flatten() == results + partionInfos.collect { [it.getName().getPartitionName(), + it.getAudit().createdDate.toInstant().toEpochMilli(), + it.getAudit().lastModifiedDate.toInstant().toEpochMilli(), + it.getAudit().createdBy] } == results where: partitionListRequest | results - new PartitionListRequest(null, ["dateint=20170101/hour=1"],false, null, new Sort(), null ) | ["dateint=20170101/hour=1"] - new PartitionListRequest(null, null, false, null, new Sort(), null) | ["dateint=20170101/hour=1", "dateint=20170102/hour=1", "dateint=20170103/hour=1"] - new PartitionListRequest(null, null, false, null, new Sort(null, SortOrder.DESC), null) | ["dateint=20170103/hour=1", "dateint=20170102/hour=1", "dateint=20170101/hour=1"] + new PartitionListRequest(null, ["dateint=20170101/hour=1"],false, null, + new Sort(), null ) | [["dateint=20170101/hour=1", 1234500000, 1234500000, "metacat_test"]] + new PartitionListRequest(null, null, false, null, + new Sort(), null) | [["dateint=20170101/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170102/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170103/hour=1", 1234500000, 1234500000, "metacat_test"]] + new PartitionListRequest(null, null, false, null, + new Sort(null, SortOrder.DESC), null) | [["dateint=20170103/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170102/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170101/hour=1", 1234500000, 1234500000, "metacat_test"]] } def "Test for get iceberg table partitionKeys" (){ diff --git a/metacat-testdata-provider/src/main/groovy/com/netflix/metacat/testdata/provider/MetacatDataInfoProvider.groovy b/metacat-testdata-provider/src/main/groovy/com/netflix/metacat/testdata/provider/MetacatDataInfoProvider.groovy index 4408ac006..2b4f2a49f 100644 --- a/metacat-testdata-provider/src/main/groovy/com/netflix/metacat/testdata/provider/MetacatDataInfoProvider.groovy +++ b/metacat-testdata-provider/src/main/groovy/com/netflix/metacat/testdata/provider/MetacatDataInfoProvider.groovy @@ -116,7 +116,7 @@ class MetacatDataInfoProvider { .serde( StorageInfo.builder().owner("test") .build()) .metadata ( ['table_type': 'ICEBERG']) - .auditInfo( AuditInfo.builder().build()) + .auditInfo( AuditInfo.builder().createdBy("metacat_test").build()) .build() ]