Skip to content

Commit

Permalink
[FLINK-12239][hive] Support table related operations in GenericHiveMe…
Browse files Browse the repository at this point in the history
…tastoreCatalog

This PR enables GenericHiveMetastoreCatalog to operate Flink tables by using Hive metastore as a storage. Flink tables will be stored as Hive tables in metastore, and GenericHiveMetastoreCatalog can convert between Flink and Hive tables upon read and write.

This closes apache#8329
  • Loading branch information
bowenli86 committed May 6, 2019
1 parent 7b46f60 commit cf20197
Show file tree
Hide file tree
Showing 17 changed files with 924 additions and 342 deletions.
5 changes: 1 addition & 4 deletions flink-connectors/flink-connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,6 @@ under the License.
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.joshelser</groupId>
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
Expand Down Expand Up @@ -387,6 +383,7 @@ under the License.
<include>commons-dbcp:commons-dbcp</include>
<include>commons-pool:commons-pool</include>
<include>commons-beanutils:commons-beanutils</include>
<include>com.fasterxml.jackson.core:*</include>
<include>com.jolbox:bonecp</include>
<include>org.apache.hive:*</include>
<include>org.apache.thrift:libthrift</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -229,31 +231,97 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
try {
client.dropTable(
tablePath.getDatabaseName(),
tablePath.getObjectName(),
// Indicate whether associated data should be deleted.
// Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary
true,
ignoreIfNotExists);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
} catch (TException e) {
throw new CatalogException(
String.format("Failed to drop table %s", tablePath.getFullName()), e);
}
}

@Override
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
throws TableNotExistException, TableAlreadyExistException, CatalogException {
try {
// alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly
if (tableExists(tablePath)) {
ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
// alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly
if (tableExists(newPath)) {
throw new TableAlreadyExistException(catalogName, newPath);
} else {
Table table = getHiveTable(tablePath);
table.setTableName(newTableName);
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
}
} else if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
} catch (TException e) {
throw new CatalogException(
String.format("Failed to rename table %s", tablePath.getFullName()), e);
}
}

@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
} else {
try {
client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, table));
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(catalogName, tablePath);
}
} catch (TException e) {
throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
}
}
}

@Override
public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists)
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
if (!tableExists(tablePath)) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
} else {
// IMetastoreClient.alter_table() requires the table to have a valid location, which it doesn't in this case
// Thus we have to translate alterTable() into (dropTable() + createTable())
dropTable(tablePath, false);
try {
createTable(tablePath, newTable, false);
} catch (TableAlreadyExistException | DatabaseNotExistException e) {
// These exceptions wouldn't be thrown, unless a concurrent operation is triggered in Hive
throw new CatalogException(
String.format("Failed to alter table %s", tablePath), e);
}
}
}

@Override
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
try {
return client.getAllTables(databaseName);
} catch (UnknownDBException e) {
throw new DatabaseNotExistException(catalogName, databaseName);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to list tables in database %s", databaseName), e);
}
}

@Override
Expand All @@ -262,13 +330,33 @@ public List<String> listViews(String databaseName) throws DatabaseNotExistExcept
}

@Override
public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
Table hiveTable = getHiveTable(tablePath);

return GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable);
}

protected Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
try {
return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
} catch (NoSuchObjectException e) {
throw new TableNotExistException(catalogName, tablePath);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e);
}
}

@Override
public boolean tableExists(ObjectPath objectPath) throws CatalogException {
throw new UnsupportedOperationException();
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
try {
return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
} catch (UnknownDBException e) {
return false;
} catch (TException e) {
throw new CatalogException(
String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()), e);
}
}

// ------ partitions ------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,178 @@

package org.apache.flink.table.catalog.hive;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.GenericCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.plan.stats.TableStats;

import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import java.util.stream.Collectors;

/**
* Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog.
*/
public class GenericHiveMetastoreCatalogUtil {

// Prefix used to distinguish properties created by Hive and Flink,
// as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
private static final String FLINK_PROPERTY_PREFIX = "flink.";

// Flink tables should be stored as 'external' tables in Hive metastore
private static final Map<String, String> EXTERNAL_TABLE_PROPERTY = new HashMap<String, String>() {{
put("EXTERNAL", "TRUE");
}};

private GenericHiveMetastoreCatalogUtil() {
}

// ------ Utils ------

/**
* Creates a Hive database from CatalogDatabase.
* Creates a Hive database from a CatalogDatabase.
*
* @param databaseName name of the database
* @param catalogDatabase the CatalogDatabase instance
* @return a Hive database
*/
public static Database createHiveDatabase(String dbName, CatalogDatabase db) {
Map<String, String> props = db.getProperties();
public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) {
return new Database(
dbName,
db.getDescription().isPresent() ? db.getDescription().get() : null,
databaseName,
catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null,
null,
props);
catalogDatabase.getProperties());
}

/**
* Creates a Hive table from a CatalogBaseTable.
* TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
*
* @param tablePath path of the table
* @param table the CatalogBaseTable instance
* @return a Hive table
*/
public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
Map<String, String> properties = new HashMap<>(table.getProperties());

// Table comment
properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());

Table hiveTable = new Table();
hiveTable.setDbName(tablePath.getDatabaseName());
hiveTable.setTableName(tablePath.getObjectName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));

// Table properties
hiveTable.setParameters(buildFlinkProperties(properties));
hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);

// Hive table's StorageDescriptor
StorageDescriptor sd = new StorageDescriptor();
sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));

List<FieldSchema> allColumns = createHiveColumns(table.getSchema());

// Table columns and partition keys
if (table instanceof CatalogTable) {
CatalogTable catalogTable = (CatalogTable) table;

if (catalogTable.isPartitioned()) {
int partitionKeySize = catalogTable.getPartitionKeys().size();
List<FieldSchema> regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize);
List<FieldSchema> partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());

sd.setCols(regularColumns);
hiveTable.setPartitionKeys(partitionColumns);
} else {
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());
}

hiveTable.setSd(sd);
} else {
// TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
throw new UnsupportedOperationException();
}

return hiveTable;
}

/**
* Creates a CatalogBaseTable from a Hive table.
* TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
*
* @param hiveTable the Hive table
* @return a CatalogBaseTable
*/
public static CatalogBaseTable createCatalogTable(Table hiveTable) {
// Table schema
TableSchema tableSchema = HiveCatalogBaseUtil.createTableSchema(
hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());

// Table properties
Map<String, String> properties = retrieveFlinkProperties(hiveTable.getParameters());

// Table comment
String comment = properties.remove(HiveTableConfig.TABLE_COMMENT);

// Partition keys
List<String> partitionKeys = new ArrayList<>();

if (hiveTable.getPartitionKeys() != null && hiveTable.getPartitionKeys().isEmpty()) {
partitionKeys = hiveTable.getPartitionKeys().stream()
.map(fs -> fs.getName())
.collect(Collectors.toList());
}

return new GenericCatalogTable(
tableSchema, new TableStats(0), partitionKeys, properties, comment);
}

/**
* Create Hive columns from Flink TableSchema.
*/
private static List<FieldSchema> createHiveColumns(TableSchema schema) {
String[] fieldNames = schema.getFieldNames();
TypeInformation[] fieldTypes = schema.getFieldTypes();

List<FieldSchema> columns = new ArrayList<>(fieldNames.length);

for (int i = 0; i < fieldNames.length; i++) {
columns.add(
new FieldSchema(fieldNames[i], HiveTypeUtil.toHiveType(fieldTypes[i]), null));
}

return columns;
}

/**
* Filter out Hive-created properties, and return Flink-created properties.
*/
private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) {
return hiveTableParams.entrySet().stream()
.filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
.collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
}

/**
* Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
*/
public static Map<String, String> buildFlinkProperties(Map<String, String> properties) {
return properties.entrySet().stream()
.filter(e -> e.getKey() != null && e.getValue() != null)
.collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue()));
}
}
Loading

0 comments on commit cf20197

Please sign in to comment.