Skip to content

Commit

Permalink
Adding iceberg partition dateCreated override (#309)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhljen authored and ajoymajumdar committed Nov 16, 2018
1 parent 90d7c33 commit b9f540e
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ enum Type {
private final String metricName;

IcebergRequestMetrics(final IcebergRequestMetrics.Type type, final String measure) {
this.metricName = String.format("metacat.iceberg.%s.%s.%s", type.name(), type.name(), measure);
this.metricName = String.format("metacat.iceberg.%s.%s", type.name(), measure);
}

IcebergRequestMetrics(final String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,18 @@ public Map<String, ScanSummary.PartitionMetrics> getIcebergTablePartitionMap(
* @return iceberg table
*/
public Table getIcebergTable(final QualifiedName tableName, final String tableMetadataLocation) {
this.icebergTableCriteria.checkCriteria(tableName, tableMetadataLocation);
final long start = this.registry.clock().wallTime();
log.debug("Loading icebergTable {} from {}", tableName, tableMetadataLocation);
final Table table = new IcebergMetastoreTables(tableMetadataLocation).load(tableName.toString());
final long duration = registry.clock().wallTime() - start;
log.info("Time taken to getIcebergTable {} is {} ms", tableName, duration);
this.icebergTableRequestMetrics.recordTimer(IcebergRequestMetrics.TagLoadTable.getMetricName(), duration);
this.icebergTableRequestMetrics.increaseCounter(IcebergRequestMetrics.TagLoadTable.getMetricName(), tableName);
return table;
try {
this.icebergTableCriteria.checkCriteria(tableName, tableMetadataLocation);
log.debug("Loading icebergTable {} from {}", tableName, tableMetadataLocation);
return new IcebergMetastoreTables(tableMetadataLocation).load(tableName.toString());
} finally {
final long duration = registry.clock().wallTime() - start;
log.info("Time taken to getIcebergTable {} is {} ms", tableName, duration);
this.icebergTableRequestMetrics.recordTimer(IcebergRequestMetrics.TagLoadTable.getMetricName(), duration);
this.icebergTableRequestMetrics.increaseCounter(
IcebergRequestMetrics.TagLoadTable.getMetricName(), tableName);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ enum Type {
private final String metricName;

HiveMetrics(final Type type, final String measure) {
this.metricName = "metacat.hive." + type.name() + "." + type.name() + "." + measure;
this.metricName = String.format("metacat.hive.%s.%s", type.name(), measure);
}
HiveMetrics(final String name) {
this.metricName = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.metacat.common.server.connectors.ConnectorRequestContext;
import com.netflix.metacat.common.server.connectors.ConnectorUtils;
import com.netflix.metacat.common.server.connectors.exception.InvalidMetaException;
import com.netflix.metacat.common.server.connectors.model.AuditInfo;
import com.netflix.metacat.common.server.connectors.model.PartitionInfo;
import com.netflix.metacat.common.server.connectors.model.PartitionListRequest;
import com.netflix.metacat.common.server.connectors.model.StorageInfo;
Expand All @@ -47,7 +48,10 @@
import org.apache.hadoop.hive.metastore.api.Table;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.time.Instant;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -358,20 +362,24 @@ private List<PartitionInfo> getIcebergPartitionInfos(
final Map<String, ScanSummary.PartitionMetrics> partitionMap
= icebergTableHandler.getIcebergTablePartitionMap(tableName, partitionsRequest, icebergTable);

final List<PartitionInfo> filteredPartitionList;
final List<String> partitionIds = partitionsRequest.getPartitionNames();
final Sort sort = partitionsRequest.getSort();
final AuditInfo tableAuditInfo = tableInfo.getAudit();

filteredPartitionList = partitionMap.keySet().stream()
final List<PartitionInfo> filteredPartitionList = partitionMap.keySet().stream()
.filter(partitionName -> partitionIds == null || partitionIds.contains(partitionName))
.map(partitionName -> PartitionInfo.builder().name(
QualifiedName.ofPartition(tableName.getCatalogName(),
tableName.getDatabaseName(),
tableName.getTableName(),
partitionName))
.dataMetrics(icebergTableHandler.getDataMetadataFromIcebergMetrics(partitionMap.get(partitionName)))
.auditInfo(tableInfo.getAudit()).build())
.auditInfo(AuditInfo.builder().createdBy(tableAuditInfo.getCreatedBy())
.createdDate(fromEpochMilliToDate(partitionMap.get(partitionName).dataTimestampMillis()))
.lastModifiedDate(fromEpochMilliToDate(partitionMap.get(partitionName).dataTimestampMillis()))
.build()).build())
.collect(Collectors.toList());

if (sort != null) {
//it can only support sortBy partition Name
final Comparator<PartitionInfo> nameComparator = Comparator.comparing(p -> p.getName().toString());
Expand All @@ -380,5 +388,7 @@ private List<PartitionInfo> getIcebergPartitionInfos(
return ConnectorUtils.paginate(filteredPartitionList, pageable);
}


private Date fromEpochMilliToDate(@Nullable final Long l) {
return (l == null) ? null : Date.from(Instant.ofEpochMilli(l));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ class HiveConnectorFastPartitionSpec extends Specification {

def setupSpec() {
conf.icebergEnabled >> true
metric1.fileCount() >> 1
metric1.dataTimestampMillis() >> 1234500000
metric1.recordCount() >> 1
hiveConnectorFastPartitionService.icebergTableHandler = icebergTableHandler
icebergTableHandler.getIcebergTable(_,_) >> Mock(Table)
icebergTableHandler.getIcebergTablePartitionMap(_,_,_) >> ["dateint=20170101/hour=1": metric1,
"dateint=20170102/hour=1": metric1,
"dateint=20170103/hour=1": metric1]
"dateint=20170102/hour=1": metric1,
"dateint=20170103/hour=1": metric1]
}

@Unroll
Expand All @@ -84,12 +87,18 @@ class HiveConnectorFastPartitionSpec extends Specification {
partitionListRequest, MetacatDataInfoProvider.getIcebergTableInfo("icebergtable"))

then:
partionInfos.collect { it.getName().getPartitionName() }.flatten() == results
partionInfos.collect { [it.getName().getPartitionName(),
it.getAudit().createdDate.toInstant().toEpochMilli(),
it.getAudit().lastModifiedDate.toInstant().toEpochMilli(),
it.getAudit().createdBy] } == results
where:
partitionListRequest | results
new PartitionListRequest(null, ["dateint=20170101/hour=1"],false, null, new Sort(), null ) | ["dateint=20170101/hour=1"]
new PartitionListRequest(null, null, false, null, new Sort(), null) | ["dateint=20170101/hour=1", "dateint=20170102/hour=1", "dateint=20170103/hour=1"]
new PartitionListRequest(null, null, false, null, new Sort(null, SortOrder.DESC), null) | ["dateint=20170103/hour=1", "dateint=20170102/hour=1", "dateint=20170101/hour=1"]
new PartitionListRequest(null, ["dateint=20170101/hour=1"],false, null,
new Sort(), null ) | [["dateint=20170101/hour=1", 1234500000, 1234500000, "metacat_test"]]
new PartitionListRequest(null, null, false, null,
new Sort(), null) | [["dateint=20170101/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170102/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170103/hour=1", 1234500000, 1234500000, "metacat_test"]]
new PartitionListRequest(null, null, false, null,
new Sort(null, SortOrder.DESC), null) | [["dateint=20170103/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170102/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170101/hour=1", 1234500000, 1234500000, "metacat_test"]]
}

def "Test for get iceberg table partitionKeys" (){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class MetacatDataInfoProvider {
.serde( StorageInfo.builder().owner("test")
.build())
.metadata ( ['table_type': 'ICEBERG'])
.auditInfo( AuditInfo.builder().build())
.auditInfo( AuditInfo.builder().createdBy("metacat_test").build())
.build()
]

Expand Down

0 comments on commit b9f540e

Please sign in to comment.