Skip to content

Commit

Permalink
upgrade iceberg to org.apache.iceberg:iceberg-spark-runtime (#375)
Browse files Browse the repository at this point in the history
* upgrade iceberg to org.apache.iceberg:iceberg-spark-runtime

* Changes to addback currentMetadataLocation() in IcebergTableOps
  • Loading branch information
zhljen committed Nov 13, 2019
1 parent 4dae419 commit 37efc39
Show file tree
Hide file tree
Showing 18 changed files with 140 additions and 93 deletions.
8 changes: 2 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ configure(javaProjects) {
dependency("org.elasticsearch.client:transport:5.4.1")
dependency("net.snowflake:snowflake-jdbc:3.4.2")
dependency("com.esotericsoftware.kryo:kryo:2.22")
dependency("com.github.Netflix.iceberg:iceberg-common:${iceberg_version}")
dependency("com.github.Netflix.iceberg:iceberg-core:${iceberg_version}")
dependency("com.github.Netflix.iceberg:iceberg-api:${iceberg_version}")
dependency("org.apache.iceberg:iceberg-spark-runtime:${iceberg_version}")
}
}

Expand All @@ -173,16 +171,14 @@ configure(javaProjects) {
/*******************************
* Compile Dependencies
*******************************/

compile("log4j:log4j")

/*******************************
* Provided Dependencies
*******************************/

compileOnly("com.google.code.findbugs:annotations")
compileOnly("com.google.code.findbugs:jsr305")

/*******************************
* Runtime Dependencies
*******************************/
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ hive_version=1.2.1
org.gradle.parallel=false
org.gradle.daemon=false

iceberg_version=0.6.3
iceberg_version=0.7.0-incubating
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ public String toString() {
return qualifiedName;
}


/**
* Checks if a CharSequence is empty ("") or null.
*/
Expand Down
4 changes: 1 addition & 3 deletions metacat-connector-hive/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ dependencies {
}

compile("commons-dbutils:commons-dbutils")
compile('com.github.Netflix.iceberg:iceberg-common')
compile('com.github.Netflix.iceberg:iceberg-core')
compile('com.github.Netflix.iceberg:iceberg-api')
compile("org.apache.iceberg:iceberg-spark-runtime")
/*******************************
* Provided Dependencies
*******************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public TableInfo fromIcebergTableToTableInfo(final QualifiedName name,
final IcebergTableWrapper tableWrapper,
final String tableLoc,
final TableInfo tableInfo) {
final com.netflix.iceberg.Table table = tableWrapper.getTable();
final org.apache.iceberg.Table table = tableWrapper.getTable();
final List<FieldInfo> allFields =
this.hiveTypeConverter.icebergeSchemaTofieldDtos(table.schema(), table.spec().fields());
final Map<String, String> tableParameters = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.netflix.iceberg.PartitionField;
import com.netflix.iceberg.Schema;
import com.netflix.iceberg.types.Types;
import com.netflix.metacat.common.server.connectors.ConnectorTypeConverter;
import com.netflix.metacat.common.server.connectors.model.FieldInfo;
import com.netflix.metacat.common.type.BaseType;
Expand Down Expand Up @@ -48,6 +45,9 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -138,7 +138,7 @@ public List<FieldInfo> icebergeSchemaTofieldDtos(final Schema schema,
* @param type iceberg type.
* @return hive type string.
*/
public static String fromIcebergToHiveType(final com.netflix.iceberg.types.Type type) {
public static String fromIcebergToHiveType(final org.apache.iceberg.types.Type type) {
switch (type.typeId()) {
case BOOLEAN:
return serdeConstants.BOOLEAN_TYPE_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.netflix.iceberg.BaseMetastoreTableOperations;
import com.netflix.iceberg.BaseMetastoreTables;
import com.netflix.iceberg.PartitionSpec;
import com.netflix.iceberg.ScanSummary;
import com.netflix.iceberg.Schema;
import com.netflix.iceberg.Table;
import com.netflix.iceberg.TableMetadata;
import com.netflix.iceberg.TableMetadataParser;
import com.netflix.iceberg.UpdateSchema;
import com.netflix.iceberg.exceptions.NoSuchTableException;
import com.netflix.iceberg.expressions.Expression;
import com.netflix.iceberg.types.Types;
import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.exception.MetacatBadRequestException;
import com.netflix.metacat.common.exception.MetacatNotSupportedException;
Expand All @@ -50,6 +38,24 @@
import com.netflix.spectator.api.Registry;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ScanSummary;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.types.Types;

import java.io.StringReader;
import java.util.HashMap;
Expand All @@ -58,6 +64,7 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;


/**
* Iceberg table handler which interacts with iceberg library
* to perform iceberg table loading, querying, etc. The operations limit to
Expand All @@ -79,8 +86,8 @@ public class IcebergTableHandler {
/**
* Constructor.
*
* @param connectorContext connector context
* @param icebergTableCriteria iceberg table criteria
* @param connectorContext connector context
* @param icebergTableCriteria iceberg table criteria
* @param icebergTableOpWrapper iceberg table operation
*/
public IcebergTableHandler(final ConnectorContext connectorContext,
Expand Down Expand Up @@ -153,20 +160,21 @@ public Map<String, ScanSummary.PartitionMetrics> getIcebergTablePartitionMap(
* @return iceberg table
*/
public IcebergTableWrapper getIcebergTable(final QualifiedName tableName, final String tableMetadataLocation,
final boolean includeInfoDetails) {
final boolean includeInfoDetails) {
final long start = this.registry.clock().wallTime();
try {
this.icebergTableCriteria.checkCriteria(tableName, tableMetadataLocation);
log.debug("Loading icebergTable {} from {}", tableName, tableMetadataLocation);
final IcebergMetastoreTables icebergMetastoreTables = new IcebergMetastoreTables(tableMetadataLocation);
final Table table = icebergMetastoreTables.load(tableName.toString());
final Table table = icebergMetastoreTables.loadTable(
HiveTableUtil.qualifiedNameToTableIdentifier(tableName));
final Map<String, String> extraProperties = Maps.newHashMap();
if (includeInfoDetails) {
extraProperties.put(DirectSqlTable.PARAM_METADATA_CONTENT,
TableMetadataParser.toJson(icebergMetastoreTables.getTableOps().current()));
}
return new IcebergTableWrapper(table, extraProperties);
} catch (NoSuchTableException e) {
} catch (NotFoundException | NoSuchTableException e) {
throw new InvalidMetaException(tableName, e);
} finally {
final long duration = registry.clock().wallTime() - start;
Expand All @@ -178,6 +186,7 @@ public IcebergTableWrapper getIcebergTable(final QualifiedName tableName, final

/**
* Updates the iceberg schema if the provided tableInfo has updated field comments.
*
* @param tableInfo table information
* @return true if an update is done
*/
Expand All @@ -195,10 +204,11 @@ public boolean update(final TableInfo tableInfo) {
throw new MetacatBadRequestException(message);
}
final IcebergMetastoreTables icebergMetastoreTables = new IcebergMetastoreTables(tableMetadataLocation);
final Table table = icebergMetastoreTables.load(tableName.toString());
final Table table = icebergMetastoreTables.loadTable(
HiveTableUtil.qualifiedNameToTableIdentifier(tableName));
final UpdateSchema updateSchema = table.updateSchema();
final Schema schema = table.schema();
for (FieldInfo field: fields) {
for (FieldInfo field : fields) {
final Types.NestedField iField = schema.findField(field.getName());
if (iField != null && !Objects.equals(field.getComment(), iField.doc())) {
updateSchema.updateColumnDoc(field.getName(), field.getComment());
Expand All @@ -211,7 +221,7 @@ public boolean update(final TableInfo tableInfo) {
if (!tableMetadataLocation.equalsIgnoreCase(newTableMetadataLocation)) {
tableInfo.getMetadata().put(DirectSqlTable.PARAM_PREVIOUS_METADATA_LOCATION, tableMetadataLocation);
tableInfo.getMetadata().put(DirectSqlTable.PARAM_METADATA_LOCATION,
icebergMetastoreTables.getTableOps().currentMetadataLocation());
newTableMetadataLocation);
}
}
}
Expand Down Expand Up @@ -248,52 +258,75 @@ private ObjectNode getMetricValueNode(final ScanSummary.PartitionMetrics metrics
* Implemented BaseMetastoreTables to interact with iceberg library.
* Load an iceberg table from a location.
*/
private final class IcebergMetastoreTables extends BaseMetastoreTables {
private final class IcebergMetastoreTables extends BaseMetastoreCatalog {
private final String tableLocation;
private BaseMetastoreTableOperations tableOperations;
private IcebergTableOps tableOperations;

IcebergMetastoreTables(final String tableMetadataLocation) {
super(conf);
this.tableLocation = tableMetadataLocation;
}

@Override
public Table create(final Schema schema, final PartitionSpec spec, final String database, final String table) {
public Table createTable(final TableIdentifier identifier,
final Schema schema,
final PartitionSpec spec,
final String location,
final Map<String, String> properties) {
throw new MetacatNotSupportedException("not supported");
}

@Override
public Table create(final Schema schema,
final PartitionSpec spec,
final Map<String, String> properties,
final String table) {
throw new MetacatNotSupportedException("Not supported");
public Transaction newCreateTableTransaction(final TableIdentifier identifier,
final Schema schema,
final PartitionSpec spec,
final String location,
final Map<String, String> properties) {
throw new MetacatNotSupportedException("not supported");
}

@Override
public Table create(final Schema schema, final PartitionSpec spec, final String tables) {
throw new MetacatNotSupportedException("Not supported");
public Transaction newReplaceTableTransaction(final TableIdentifier identifier,
final Schema schema,
final PartitionSpec spec,
final String location,
final Map<String, String> properties,
final boolean orCreate) {
throw new MetacatNotSupportedException("not supported");
}

@Override
public Table load(final String tableName) {
final QualifiedName table = QualifiedName.fromString(tableName);
return super.load(table.getDatabaseName(), table.getTableName());
public Table loadTable(final TableIdentifier identifier) {
return super.loadTable(identifier);
}

@Override
public BaseMetastoreTableOperations newTableOps(final Configuration config,
final String database,
final String table) {
protected TableOperations newTableOps(final TableIdentifier tableIdentifier) {
return getTableOps();
}

@Override
protected String defaultWarehouseLocation(final TableIdentifier tableIdentifier) {
throw new MetacatNotSupportedException("not supported");
}

@Override
public boolean dropTable(final TableIdentifier identifier,
final boolean purge) {
throw new MetacatNotSupportedException("not supported");
}

@Override
public void renameTable(final TableIdentifier from,
final TableIdentifier to) {
throw new MetacatNotSupportedException("not supported");
}

/**
* Creates a new instance of IcebergTableOps for the given table location, if not exists.
*
* @return a MetacatServerOps for the table
*/
public BaseMetastoreTableOperations getTableOps() {
public IcebergTableOps getTableOps() {
if (tableOperations == null) {
tableOperations = new IcebergTableOps(tableLocation);
}
Expand All @@ -309,11 +342,15 @@ private final class IcebergTableOps extends BaseMetastoreTableOperations {
private String location;

IcebergTableOps(final String location) {
super(conf);
this.location = location;
refresh();
}

@Override
public FileIO io() {
return new HadoopFileIO(conf);
}

@Override
public TableMetadata refresh() {
refreshFromMetadataLocation(this.location,
Expand All @@ -326,10 +363,16 @@ public String currentMetadataLocation() {
return location;
}

@Override
public TableMetadata current() {
return super.current();
}

@Override
public void commit(final TableMetadata base, final TableMetadata metadata) {
if (!base.equals(metadata)) {
location = writeNewMetadata(metadata, currentVersion() + 1);
this.requestRefresh();
}
}
}
Expand All @@ -344,15 +387,16 @@ private void recordTimer(final String requestTag, final long duration) {
final HashMap<String, String> tags = new HashMap<>();
tags.put("request", requestTag);
this.registry.timer(registry.createId(IcebergRequestMetrics.TimerIcebergRequest.getMetricName())
.withTags(tags))
.record(duration, TimeUnit.MILLISECONDS);
.withTags(tags))
.record(duration, TimeUnit.MILLISECONDS);
log.debug("## Time taken to complete {} is {} ms", requestTag, duration);
}

/**
* increase the counter of operation.
*
* @param metricName metric name
* @param tableName table name of the operation
* @param tableName table name of the operation
*/
private void increaseCounter(final String metricName, final QualifiedName tableName) {
this.registry.counter(registry.createId(metricName).withTags(tableName.parts())).increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.netflix.iceberg.ScanSummary;
import com.netflix.iceberg.Table;
import com.netflix.iceberg.expressions.Expression;
import com.netflix.metacat.common.server.connectors.ConnectorContext;
import com.netflix.metacat.common.server.properties.Config;
import com.netflix.metacat.common.server.util.ThreadServiceManager;
import com.netflix.metacat.connector.hive.util.HiveConfigConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.ScanSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;

import javax.annotation.Nullable;
import java.util.Map;
Expand All @@ -43,6 +43,7 @@
@Slf4j
public class IcebergTableOpWrapper {
private final Config config;

private final Map<String, String> configuration;
private final ThreadServiceManager threadServiceManager;

Expand All @@ -66,7 +67,7 @@ public IcebergTableOpWrapper(final ConnectorContext connectorContext,
* @return scan summary map
*/
public Map<String, ScanSummary.PartitionMetrics> getPartitionMetricsMap(final Table icebergTable,
@Nullable final Expression filter) {
@Nullable final Expression filter) {
Map<String, ScanSummary.PartitionMetrics> result = Maps.newHashMap();
//
// Cancel the iceberg call if it times out.
Expand Down
Loading

0 comments on commit 37efc39

Please sign in to comment.