Skip to content

Commit

Permalink
using guava retry logic and adding interal sql based hive table parti… (
Browse files Browse the repository at this point in the history
#71)

* using guava retry logic and adding interal sql based hive table partition services

* fixing the functionalTest failure

* fixing the style check

* changing functionalTest to use hive fast service

* changing to use existing embeded hive metastore impl

* using hivefastservice in functionalTest
  • Loading branch information
zhljen authored and ajoymajumdar committed May 11, 2017
1 parent c607765 commit 19c2fe5
Show file tree
Hide file tree
Showing 21 changed files with 1,555 additions and 431 deletions.
4 changes: 4 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Temporarily change release scope to major
release.scope=major

amazon_sns_version=1.11.60
airlift_version=0.116
cassandra_driver_version=3.1.4
Expand All @@ -27,3 +29,5 @@ postgresql_driver_version=42.0.0
redshift_driver_version=1.2.1.1001
slf4j_version=1.7.12
swagger_version=1.3.12
guavaretrying_version=2.0.0
commondbutil_version=1.6
5 changes: 5 additions & 0 deletions metacat-connector-hive/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ dependencies {
compile "com.google.inject:guice:${guice_version}"
compile "com.google.inject.extensions:guice-persist:${guice_version}"
compile "com.google.inject.extensions:guice-multibindings:${guice_version}"
//exlude guava 20 as it conflicts with cassandra driver, which needs guava 19
compile("com.github.rholder:guava-retrying:${guavaretrying_version}") {
exclude group: 'com.google.guava'
}
compile "commons-dbutils:commons-dbutils:${commondbutil_version}"
runtime "mysql:mysql-connector-java:${mysql_connector_version}"
testCompile project(':metacat-testdata-provider')
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@
*/
@Slf4j
public class HiveConnectorFactory implements ConnectorFactory {
private static final String THRIFT_URI = HiveConfigConstants.THRIFT_URI;
private static final String HIVE_METASTORE_TIMEOUT = HiveConfigConstants.HIVE_METASTORE_TIMEOUT;
private static final String USE_EMBEDDED_METASTORE = HiveConfigConstants.USE_EMBEDDED_METASTORE;
private final String catalogName;
private final Map<String, String> configuration;
private final HiveConnectorInfoConverter infoConverter;
Expand All @@ -73,9 +70,13 @@ public HiveConnectorFactory(@Nonnull @NonNull final String catalogName,
this.infoConverter = infoConverter;
try {
final boolean useLocalMetastore = Boolean
.parseBoolean(configuration.getOrDefault(USE_EMBEDDED_METASTORE, "false"));
.parseBoolean(configuration.getOrDefault(HiveConfigConstants.USE_EMBEDDED_METASTORE, "false"));
if (!useLocalMetastore) {
client = createThriftClient();
if (configuration.containsKey(HiveConfigConstants.USE_FASTHIVE_SERVICE)) {
configuration.replace(HiveConfigConstants.USE_FASTHIVE_SERVICE, "false");
log.info("Always not use HiveConnectorFastService in thrift client mode");
}
} else {
client = createLocalClient();
}
Expand All @@ -97,6 +98,7 @@ private IMetacatHiveClient createLocalClient() throws MetaException {
return new EmbeddedHiveClient(catalogName, metacatHMSHandler,
MetacatHMSHandler.newRetryingHMSHandler(HiveConfigConstants.HIVE_HMSHANDLER_NAME,
conf, true, metacatHMSHandler));

}

private static HiveConf getDefaultConf() {
Expand All @@ -116,9 +118,10 @@ private static HiveConf getDefaultConf() {
private IMetacatHiveClient createThriftClient() throws MetaException {
final HiveMetastoreClientFactory factory =
new HiveMetastoreClientFactory(null,
(int) HiveConnectorUtil.toTime(configuration.getOrDefault(HIVE_METASTORE_TIMEOUT, "20s"),
(int) HiveConnectorUtil.toTime(
configuration.getOrDefault(HiveConfigConstants.HIVE_METASTORE_TIMEOUT, "20s"),
TimeUnit.SECONDS, TimeUnit.MILLISECONDS));
final String metastoreUri = configuration.get(THRIFT_URI);
final String metastoreUri = configuration.get(HiveConfigConstants.THRIFT_URI);
URI uri = null;
try {
uri = new URI(metastoreUri);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.metacat.connector.hive;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.util.DataSourceManager;
import com.netflix.metacat.common.util.ThreadServiceManager;
import com.netflix.metacat.connector.hive.converters.HiveConnectorInfoConverter;
import lombok.NonNull;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;

import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Named;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

/**
* HiveConnectorFastTableService.
*
* @author zhenl
* @since 1.0.0
*/
public class HiveConnectorFastTableService extends HiveConnectorTableService {
private static final String SQL_GET_TABLE_NAMES_BY_URI =
"select d.name schema_name, t.tbl_name table_name"
+ " from DBS d, TBLS t, SDS s where d.DB_ID=t.DB_ID and t.sd_id=s.sd_id";
private final boolean allowRenameTable;
private final ThreadServiceManager threadServiceManager;

/**
* Constructor.
*
* @param catalogName catalogname
* @param metacatHiveClient hive client
* @param hiveConnectorDatabaseService databaseService
* @param hiveMetacatConverters hive converter
* @param threadServiceManager threadservicemanager
* @param allowRenameTable allow rename table
*/
@Inject
public HiveConnectorFastTableService(@Named("catalogName") final String catalogName,
@Nonnull @NonNull final IMetacatHiveClient metacatHiveClient,
@Nonnull @NonNull final HiveConnectorDatabaseService hiveConnectorDatabaseService,
@Nonnull @NonNull final HiveConnectorInfoConverter hiveMetacatConverters,
final ThreadServiceManager threadServiceManager,
@Named("allowRenameTable") final boolean allowRenameTable) {
super(catalogName, metacatHiveClient, hiveConnectorDatabaseService, hiveMetacatConverters, allowRenameTable);
this.allowRenameTable = allowRenameTable;
this.threadServiceManager = threadServiceManager;
this.threadServiceManager.start();
}

/**
* listNames.
*
* @param uri uri
* @param prefixSearch prefixSearch
* @return list of tables matching the prefixSearch
*/
public List<QualifiedName> listNames(final String uri, final boolean prefixSearch) {
final List<QualifiedName> result = Lists.newArrayList();
// Get data source
final DataSource dataSource = DataSourceManager.get().get(catalogName);
// Create the sql
final StringBuilder queryBuilder = new StringBuilder(SQL_GET_TABLE_NAMES_BY_URI);
String param = uri;
if (prefixSearch) {
queryBuilder.append(" and location like ?");
param = uri + "%";
} else {
queryBuilder.append(" and location = ?");
}
// Handler for reading the result set
final ResultSetHandler<List<QualifiedName>> handler = rs -> {
while (rs.next()) {
final String schemaName = rs.getString("schema_name");
final String tableName = rs.getString("table_name");
result.add(QualifiedName.ofTable(catalogName, schemaName, tableName));
}
return result;
};
try (Connection conn = dataSource.getConnection()) {
new QueryRunner()
.query(conn, queryBuilder.toString(), handler, param);
} catch (SQLException e) {
throw Throwables.propagate(e);
}
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.name.Names;
import com.netflix.metacat.common.server.ArchaiusConfigImpl;
import com.netflix.metacat.common.server.Config;
import com.netflix.metacat.common.server.connectors.ConnectorDatabaseService;
import com.netflix.metacat.common.server.connectors.ConnectorPartitionService;
import com.netflix.metacat.common.server.connectors.ConnectorTableService;
import com.netflix.metacat.common.util.ThreadServiceManager;
import com.netflix.metacat.connector.hive.converters.HiveConnectorInfoConverter;
import com.netflix.metacat.connector.hive.util.HiveConfigConstants;

import java.util.Map;

Expand All @@ -37,31 +41,47 @@ public class HiveConnectorModule implements Module {
private final String catalogName;
private final HiveConnectorInfoConverter infoConverter;
private final IMetacatHiveClient hiveMetastoreClient;
private final boolean fastService;
private final boolean allowRenameTable;

/**
* Constructor.
*
* @param catalogName catalog name.
* @param configuration configuration properties
* @param infoConverter Hive info converter
* @param catalogName catalog name.
* @param configuration configuration properties
* @param infoConverter Hive info converter
* @param hiveMetastoreClient hive metastore client
*/
public HiveConnectorModule(final String catalogName, final Map<String, String> configuration,
final HiveConnectorInfoConverter infoConverter,
final IMetacatHiveClient hiveMetastoreClient) {
final HiveConnectorInfoConverter infoConverter,
final IMetacatHiveClient hiveMetastoreClient) {
this.catalogName = catalogName;
this.infoConverter = infoConverter;
this.hiveMetastoreClient = hiveMetastoreClient;
this.fastService = Boolean
.parseBoolean(configuration.getOrDefault(HiveConfigConstants.USE_FASTHIVE_SERVICE, "false"));
this.allowRenameTable = Boolean
.parseBoolean(configuration.getOrDefault(HiveConfigConstants.ALLOW_RENAME_TABLE, "false"));
}

@Override
public void configure(final Binder binder) {
binder.bind(Config.class).toInstance(new ArchaiusConfigImpl());
binder.bind(ThreadServiceManager.class).asEagerSingleton();
binder.bind(String.class).annotatedWith(Names.named("catalogName")).toInstance(catalogName);
binder.bind(Boolean.class).annotatedWith(Names.named("allowRenameTable")).toInstance(allowRenameTable);
binder.bind(HiveConnectorInfoConverter.class).toInstance(infoConverter);
binder.bind(IMetacatHiveClient.class).toInstance(hiveMetastoreClient);
binder.bind(ConnectorDatabaseService.class).to(HiveConnectorDatabaseService.class).in(Scopes.SINGLETON);
binder.bind(ConnectorTableService.class).to(HiveConnectorTableService.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPartitionService.class).to(HiveConnectorPartitionService.class).in(Scopes.SINGLETON);
if (fastService) {
binder.bind(ConnectorPartitionService.class).
to(HiveConnectorFastPartitionService.class).in(Scopes.SINGLETON);
binder.bind(ConnectorTableService.class).
to(HiveConnectorFastTableService.class).in(Scopes.SINGLETON);
} else {
binder.bind(ConnectorPartitionService.class).to(HiveConnectorPartitionService.class).in(Scopes.SINGLETON);
binder.bind(ConnectorTableService.class).to(HiveConnectorTableService.class).in(Scopes.SINGLETON);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@
* @since 1.0.0
*/
public class HiveConnectorPartitionService implements ConnectorPartitionService {

protected final String catalogName;
private final IMetacatHiveClient metacatHiveClient;
private final HiveConnectorInfoConverter hiveMetacatConverters;
private final String catalogName;


/**
* Constructor.
Expand Down Expand Up @@ -372,6 +372,7 @@ public List<String> getPartitionKeys(final List<FieldSchema> fields) {
return result;
}


protected Map<String, Partition> getPartitionsByNames(final Table table, final List<String> partitionNames) {
final String databasename = table.getDbName();
final String tablename = table.getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@
*/
public class HiveConnectorTableService implements ConnectorTableService {
private static final String PARAMETER_EXTERNAL = "EXTERNAL";
protected final String catalogName;
private final IMetacatHiveClient metacatHiveClient;
private final HiveConnectorInfoConverter hiveMetacatConverters;
private final HiveConnectorDatabaseService hiveConnectorDatabaseService;
private final String catalogName;
private final boolean allowRenameTable;


/**
* Constructor.
Expand All @@ -73,16 +75,19 @@ public class HiveConnectorTableService implements ConnectorTableService {
* @param metacatHiveClient hiveclient
* @param hiveConnectorDatabaseService hivedatabaseService
* @param hiveMetacatConverters converter
* @param allowRenameTable allow rename table
*/
@Inject
public HiveConnectorTableService(@Named("catalogName") final String catalogName,
@Nonnull @NonNull final IMetacatHiveClient metacatHiveClient,
@Nonnull @NonNull final HiveConnectorDatabaseService hiveConnectorDatabaseService,
@Nonnull @NonNull final HiveConnectorInfoConverter hiveMetacatConverters) {
@Nonnull @NonNull final HiveConnectorInfoConverter hiveMetacatConverters,
@Named("allowRenameTable") final boolean allowRenameTable) {
this.metacatHiveClient = metacatHiveClient;
this.hiveMetacatConverters = hiveMetacatConverters;
this.hiveConnectorDatabaseService = hiveConnectorDatabaseService;
this.catalogName = catalogName;
this.allowRenameTable = allowRenameTable;
}

/**
Expand Down Expand Up @@ -389,6 +394,10 @@ public void rename(
@Nonnull @NonNull final QualifiedName oldName,
@Nonnull @NonNull final QualifiedName newName
) {
if (!allowRenameTable) {
throw new ConnectorException(
"Renaming tables is disabled in catalog " + catalogName, null);
}
try {
metacatHiveClient.rename(oldName.getDatabaseName(), oldName.getTableName(),
newName.getDatabaseName(), newName.getTableName());
Expand Down
Loading

0 comments on commit 19c2fe5

Please sign in to comment.