From 5b9e7882207357120717966d8bf7efd53c53ede5 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Thu, 15 Apr 2021 09:12:16 +0800 Subject: [PATCH] [FLINK-21660][hive] Stop using is_generic to differentiate hive and flink tables (#15155) --- .../hive/HiveDynamicTableFactory.java | 46 +--- .../connectors/hive/HiveTableFactory.java | 20 +- .../flink/table/catalog/hive/HiveCatalog.java | 252 ++++++++++-------- .../table/catalog/hive/HiveDatabaseUtil.java | 105 -------- .../catalog/hive/util/HiveTableUtil.java | 40 ++- .../hive/DDLOperationConverter.java | 12 +- .../connectors/hive/HiveDialectITCase.java | 4 - .../connectors/hive/HiveTableFactoryTest.java | 7 +- .../catalog/hive/HiveCatalogDataTypeTest.java | 5 +- .../hive/HiveCatalogGenericMetadataTest.java | 19 +- .../hive/HiveCatalogHiveMetadataTest.java | 29 +- .../table/catalog/hive/HiveCatalogITCase.java | 25 ++ .../table/catalog/hive/HiveCatalogTest.java | 12 +- .../pyflink/table/tests/test_catalog.py | 6 +- .../client/gateway/local/DependencyTest.java | 5 +- .../hive/ddl/SqlCreateHiveDatabase.java | 5 - .../parser/hive/ddl/SqlCreateHiveTable.java | 8 +- .../parser/hive/ddl/SqlCreateHiveView.java | 6 +- .../table/catalog/CatalogDatabaseImpl.java | 5 + .../table/catalog/CatalogTableBuilder.java | 7 - .../flink/table/catalog/CatalogTableImpl.java | 1 - .../table/catalog/GenericInMemoryCatalog.java | 4 +- .../flink/table/catalog/CatalogTestBase.java | 6 +- .../flink/table/catalog/CatalogDatabase.java | 7 + .../flink/table/catalog/CatalogTest.java | 8 +- .../flink/table/catalog/CatalogTestUtil.java | 27 +- 26 files changed, 286 insertions(+), 385 deletions(-) delete mode 100644 flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveDatabaseUtil.java diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java index 36b971af99013..553cdaf11b314 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java @@ -20,10 +20,8 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; @@ -35,11 +33,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapred.JobConf; -import java.util.HashMap; -import java.util.Map; import java.util.Set; -import static org.apache.flink.table.catalog.CatalogPropertiesUtil.IS_GENERIC; import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_ENABLE; import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE; @@ -67,27 +62,12 @@ public Set> optionalOptions() { throw new UnsupportedOperationException("Hive factory is only work for catalog."); } - private static ResolvedCatalogTable removeIsGenericFlag(Context context) { - Map newOptions = new HashMap<>(context.getCatalogTable().getOptions()); - boolean isGeneric = Boolean.parseBoolean(newOptions.remove(IS_GENERIC)); - // temporary table doesn't have the IS_GENERIC flag but we still consider it generic - if (!isGeneric && !context.isTemporary()) { - throw new ValidationException( - "Hive dynamic table factory now only work for generic table."); - } - return context.getCatalogTable().copy(newOptions); - } - @Override public DynamicTableSink createDynamicTableSink(Context context) { - boolean isGeneric = - Boolean.parseBoolean( - context.getCatalogTable() - .getOptions() - .get(CatalogPropertiesUtil.IS_GENERIC)); - - // temporary table doesn't have the IS_GENERIC flag but we still consider it generic - if (!isGeneric && !context.isTemporary()) { + boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions()); + + // we don't support temporary hive tables yet + if (isHiveTable && !context.isTemporary()) { Integer configuredParallelism = Configuration.fromMap(context.getCatalogTable().getOptions()) .get(FileSystemOptions.SINK_PARALLELISM); @@ -101,7 +81,7 @@ public DynamicTableSink createDynamicTableSink(Context context) { return FactoryUtil.createTableSink( null, // we already in the factory of catalog context.getObjectIdentifier(), - removeIsGenericFlag(context), + context.getCatalogTable(), context.getConfiguration(), context.getClassLoader(), context.isTemporary()); @@ -110,14 +90,10 @@ public DynamicTableSink createDynamicTableSink(Context context) { @Override public DynamicTableSource createDynamicTableSource(Context context) { - boolean isGeneric = - Boolean.parseBoolean( - context.getCatalogTable() - .getOptions() - .get(CatalogPropertiesUtil.IS_GENERIC)); - - // temporary table doesn't have the IS_GENERIC flag but we still consider it generic - if (!isGeneric && !context.isTemporary()) { + boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions()); + + // we don't support temporary hive tables yet + if (isHiveTable && !context.isTemporary()) { CatalogTable catalogTable = Preconditions.checkNotNull(context.getCatalogTable()); boolean isStreamingSource = @@ -158,7 +134,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { return FactoryUtil.createTableSource( null, // we already in the factory of catalog context.getObjectIdentifier(), - removeIsGenericFlag(context), + context.getCatalogTable(), context.getConfiguration(), context.getClassLoader(), context.isTemporary()); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java index 28667932a0d9c..4599f6b1d7e17 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java @@ -18,9 +18,9 @@ package org.apache.flink.connectors.hive; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.factories.TableFactoryUtil; import org.apache.flink.table.factories.TableSinkFactory; import org.apache.flink.table.factories.TableSourceFactory; @@ -51,13 +51,12 @@ public TableSource createTableSource(TableSourceFactory.Context context) { CatalogTable table = checkNotNull(context.getTable()); Preconditions.checkArgument(table instanceof CatalogTableImpl); - boolean isGeneric = - Boolean.parseBoolean(table.getOptions().get(CatalogPropertiesUtil.IS_GENERIC)); + boolean isHiveTable = HiveCatalog.isHiveTable(table.getOptions()); - // temporary table doesn't have the IS_GENERIC flag but we still consider it generic - if (!isGeneric && !context.isTemporary()) { + // we don't support temporary hive tables yet + if (isHiveTable && !context.isTemporary()) { throw new UnsupportedOperationException( - "Hive table should be resolved by HiveDynamicTableFactory."); + "Legacy TableSource for Hive is deprecated. Hive table source should be created by HiveDynamicTableFactory."); } else { return TableFactoryUtil.findAndCreateTableSource(context); } @@ -68,13 +67,12 @@ public TableSink createTableSink(TableSinkFactory.Context context) { CatalogTable table = checkNotNull(context.getTable()); Preconditions.checkArgument(table instanceof CatalogTableImpl); - boolean isGeneric = - Boolean.parseBoolean(table.getOptions().get(CatalogPropertiesUtil.IS_GENERIC)); + boolean isHiveTable = HiveCatalog.isHiveTable(table.getOptions()); - // temporary table doesn't have the IS_GENERIC flag but we still consider it generic - if (!isGeneric && !context.isTemporary()) { + // we don't support temporary hive tables yet + if (isHiveTable && !context.isTemporary()) { throw new UnsupportedOperationException( - "Hive table should be resolved by HiveDynamicTableFactory."); + "Legacy TableSink for Hive is deprecated. Hive table sink should be created by HiveDynamicTableFactory."); } else { return TableFactoryUtil.findAndCreateTableSink(context); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 259f071fd1621..87c41e895cacd 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -23,6 +23,8 @@ import org.apache.flink.connectors.hive.HiveDynamicTableFactory; import org.apache.flink.connectors.hive.HiveTableFactory; import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils; +import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase; +import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner; import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp; import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase; import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; @@ -116,9 +118,13 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE; import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_COL_CASCADE; import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_TABLE_OP; import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.IDENTIFIER; import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_COLS; import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_CONSTRAINT_TRAITS; import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.PK_CONSTRAINT_TRAIT; @@ -126,6 +132,8 @@ import static org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveIntStat; import static org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveLongStat; import static org.apache.flink.table.catalog.hive.util.HiveTableUtil.getHadoopConfiguration; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.flink.table.utils.PartitionPathUtils.unescapePathName; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -316,13 +324,9 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { Database hiveDatabase = getHiveDatabase(databaseName); - Map properties = hiveDatabase.getParameters(); + Map properties = new HashMap<>(hiveDatabase.getParameters()); - boolean isGeneric = isGenericForGet(properties); - if (!isGeneric) { - properties.put( - SqlCreateHiveDatabase.DATABASE_LOCATION_URI, hiveDatabase.getLocationUri()); - } + properties.put(SqlCreateHiveDatabase.DATABASE_LOCATION_URI, hiveDatabase.getLocationUri()); return new CatalogDatabaseImpl(properties, hiveDatabase.getDescription()); } @@ -335,7 +339,12 @@ public void createDatabase( !isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty"); checkNotNull(database, "database cannot be null"); - Database hiveDatabase = HiveDatabaseUtil.instantiateHiveDatabase(databaseName, database); + Map properties = database.getProperties(); + + String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI); + + Database hiveDatabase = + new Database(databaseName, database.getComment(), dbLocationUri, properties); try { client.createDatabase(hiveDatabase); @@ -370,7 +379,7 @@ public void alterDatabase( } try { - client.alterDatabase(databaseName, HiveDatabaseUtil.alterDatabase(hiveDB, newDatabase)); + client.alterDatabase(databaseName, alterDatabase(hiveDB, newDatabase)); } catch (TException e) { throw new CatalogException( String.format("Failed to alter database %s", databaseName), e); @@ -454,8 +463,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig UniqueConstraint pkConstraint = null; List notNullCols = new ArrayList<>(); - boolean isGeneric = isGenericForCreate(table.getOptions()); - if (!isGeneric) { + boolean isHiveTable = isHiveTable(table.getOptions()); + if (isHiveTable) { pkConstraint = table.getSchema().getPrimaryKey().orElse(null); String nnColStr = hiveTable.getParameters().remove(NOT_NULL_COLS); if (nnColStr != null) { @@ -467,6 +476,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } } + // remove the 'connector' option for hive table + hiveTable.getParameters().remove(CONNECTOR.key()); } try { @@ -566,12 +577,8 @@ public void alterTable( existingTable.getTableKind(), newCatalogTable.getTableKind())); } - boolean isGeneric = isGenericForGet(hiveTable.getParameters()); - if (isGeneric) { - hiveTable = - HiveTableUtil.alterTableViaCatalogBaseTable( - tablePath, newCatalogTable, hiveTable, hiveConf); - } else { + boolean isHiveTable = isHiveTable(hiveTable.getParameters()); + if (isHiveTable) { AlterTableOp op = HiveTableUtil.extractAlterTableOp(newCatalogTable.getOptions()); if (op == null) { // the alter operation isn't encoded as properties @@ -587,9 +594,16 @@ public void alterTable( newCatalogTable.getOptions(), hiveTable.getSd()); } + } else { + hiveTable = + HiveTableUtil.alterTableViaCatalogBaseTable( + tablePath, newCatalogTable, hiveTable, hiveConf); } - disallowChangeIsGeneric(isGeneric, isGenericForGet(hiveTable.getParameters())); + disallowChangeIsHiveTable(isHiveTable, isHiveTable(hiveTable.getParameters())); + if (isHiveTable) { + hiveTable.getParameters().remove(CONNECTOR.key()); + } try { client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable); } catch (TException e) { @@ -674,7 +688,24 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException { @VisibleForTesting public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException { try { - return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + boolean isHiveTable; + if (table.getParameters().containsKey(CatalogPropertiesUtil.IS_GENERIC)) { + // check is_generic to be backward compatible + isHiveTable = + !Boolean.parseBoolean( + table.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC)); + } else { + isHiveTable = + !table.getParameters().containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR.key()) + && !table.getParameters() + .containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR_TYPE); + } + // for hive table, we add the connector property + if (isHiveTable) { + table.getParameters().put(CONNECTOR.key(), IDENTIFIER); + } + return table; } catch (NoSuchObjectException e) { throw new TableNotExistException(getName(), tablePath); } catch (TException e) { @@ -689,38 +720,15 @@ private CatalogBaseTable instantiateCatalogTable(Table hiveTable, HiveConf hiveC boolean isView = TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW; // Table properties - Map properties = hiveTable.getParameters(); + Map properties = new HashMap<>(hiveTable.getParameters()); - boolean isGeneric = isGenericForGet(hiveTable.getParameters()); + boolean isHiveTable = isHiveTable(properties); TableSchema tableSchema; // Partition keys List partitionKeys = new ArrayList<>(); - if (isGeneric) { - properties = retrieveFlinkProperties(properties); - DescriptorProperties tableSchemaProps = new DescriptorProperties(true); - tableSchemaProps.putProperties(properties); - ObjectPath tablePath = new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()); - // try to get table schema with both new and old (1.10) key, in order to support tables - // created in old version - tableSchema = - tableSchemaProps - .getOptionalTableSchema(Schema.SCHEMA) - .orElseGet( - () -> - tableSchemaProps - .getOptionalTableSchema("generic.table.schema") - .orElseThrow( - () -> - new CatalogException( - "Failed to get table schema from properties for generic table " - + tablePath))); - partitionKeys = tableSchemaProps.getPartitionKeys(); - // remove the schema from properties - properties = CatalogTableImpl.removeRedundant(properties, tableSchema, partitionKeys); - } else { - properties.put(CatalogPropertiesUtil.IS_GENERIC, String.valueOf(false)); + if (isHiveTable) { // Table schema List fields = getNonPartitionFields(hiveConf, hiveTable); Set notNullColumns = @@ -745,6 +753,28 @@ private CatalogBaseTable instantiateCatalogTable(Table hiveTable, HiveConf hiveC if (!hiveTable.getPartitionKeys().isEmpty()) { partitionKeys = getFieldNames(hiveTable.getPartitionKeys()); } + } else { + properties = retrieveFlinkProperties(properties); + DescriptorProperties tableSchemaProps = new DescriptorProperties(true); + tableSchemaProps.putProperties(properties); + ObjectPath tablePath = new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()); + // try to get table schema with both new and old (1.10) key, in order to support tables + // created in old version + tableSchema = + tableSchemaProps + .getOptionalTableSchema(Schema.SCHEMA) + .orElseGet( + () -> + tableSchemaProps + .getOptionalTableSchema("generic.table.schema") + .orElseThrow( + () -> + new CatalogException( + "Failed to get table schema from properties for generic table " + + tablePath))); + partitionKeys = tableSchemaProps.getPartitionKeys(); + // remove the schema from properties + properties = CatalogTableImpl.removeRedundant(properties, tableSchema, partitionKeys); } String comment = properties.remove(HiveCatalogConfig.COMMENT); @@ -772,20 +802,14 @@ private List getNonPartitionFields(HiveConf hiveConf, Table hiveTab } } - /** - * Filter out Hive-created properties, and return Flink-created properties. Note that - * 'is_generic' is a special key and this method will leave it as-is. - */ + /** Filter out Hive-created properties, and return Flink-created properties. */ private static Map retrieveFlinkProperties( Map hiveTableParams) { return hiveTableParams.entrySet().stream() - .filter( - e -> - e.getKey().startsWith(FLINK_PROPERTY_PREFIX) - || e.getKey().equals(CatalogPropertiesUtil.IS_GENERIC)) + .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX)) .collect( Collectors.toMap( - e -> e.getKey().replaceFirst(FLINK_PROPERTY_PREFIX, ""), + e -> e.getKey().substring(FLINK_PROPERTY_PREFIX.length()), e -> e.getValue())); } @@ -822,18 +846,15 @@ public void createPartition( checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); checkNotNull(partition, "Partition cannot be null"); - boolean isGeneric = - Boolean.valueOf(partition.getProperties().get(CatalogPropertiesUtil.IS_GENERIC)); - - if (isGeneric) { - throw new CatalogException("Currently only supports non-generic CatalogPartition"); - } - Table hiveTable = getHiveTable(tablePath); + ensurePartitionedTable(tablePath, hiveTable); - ensureTableAndPartitionMatch(hiveTable, partition); + // partition doesn't have connector property, so check the table + boolean isHiveTable = isHiveTable(hiveTable.getParameters()); - ensurePartitionedTable(tablePath, hiveTable); + if (!isHiveTable) { + throw new CatalogException("Currently only supports partition for hive tables"); + } try { client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, partition)); @@ -1021,18 +1042,16 @@ public void alterPartition( checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); checkNotNull(newPartition, "New partition cannot be null"); - boolean isGeneric = isGenericForGet(newPartition.getProperties()); - - if (isGeneric) { - throw new CatalogException("Currently only supports non-generic CatalogPartition"); - } - // Explicitly check if the partition exists or not // because alter_partition() doesn't throw NoSuchObjectException like dropPartition() when // the target doesn't exist try { Table hiveTable = getHiveTable(tablePath); - ensureTableAndPartitionMatch(hiveTable, newPartition); + boolean isHiveTable = isHiveTable(hiveTable.getParameters()); + if (!isHiveTable) { + throw new CatalogException("Currently only supports partition for hive tables"); + } + Partition hivePartition = getHivePartition(hiveTable, partitionSpec); if (hivePartition == null) { if (ignoreIfNotExists) { @@ -1072,21 +1091,6 @@ public void alterPartition( } } - // make sure both table and partition are generic, or neither is - private static void ensureTableAndPartitionMatch( - Table hiveTable, CatalogPartition catalogPartition) { - boolean tableIsGeneric = isGenericForGet(hiveTable.getParameters()); - boolean partitionIsGeneric = isGenericForGet(catalogPartition.getProperties()); - - if (tableIsGeneric != partitionIsGeneric) { - throw new CatalogException( - String.format( - "Cannot handle %s partition for %s table", - catalogPartition.getClass().getName(), - tableIsGeneric ? "generic" : "non-generic")); - } - } - private Partition instantiateHivePartition( Table hiveTable, CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition) throws PartitionSpecInvalidException { @@ -1674,37 +1678,14 @@ public CatalogColumnStatistics getPartitionColumnStatistics( } } - public static boolean isGenericForCreate(Map properties) { - // When creating an object, a hive object needs explicitly have a key is_generic = false - // otherwise, this is a generic object if 1) the key is missing 2) is_generic = true - // this is opposite to reading an object. See getObjectIsGeneric(). - if (properties == null) { - return true; - } - boolean isGeneric; - if (!properties.containsKey(CatalogPropertiesUtil.IS_GENERIC)) { - // must be a generic object - isGeneric = true; - properties.put(CatalogPropertiesUtil.IS_GENERIC, String.valueOf(true)); - } else { - isGeneric = Boolean.parseBoolean(properties.get(CatalogPropertiesUtil.IS_GENERIC)); - } - return isGeneric; - } - - public static boolean isGenericForGet(Map properties) { - // When retrieving an object, a generic object needs explicitly have a key is_generic = true - // otherwise, this is a Hive object if 1) the key is missing 2) is_generic = false - // this is opposite to creating an object. See createObjectIsGeneric() - return properties != null - && Boolean.parseBoolean( - properties.getOrDefault(CatalogPropertiesUtil.IS_GENERIC, "false")); + @VisibleForTesting + public static boolean isHiveTable(Map properties) { + return IDENTIFIER.equalsIgnoreCase(properties.get(CONNECTOR.key())); } - public static void disallowChangeIsGeneric(boolean oldIsGeneric, boolean newIsGeneric) { + private static void disallowChangeIsHiveTable(boolean oldIsHive, boolean newIsHive) { checkArgument( - oldIsGeneric == newIsGeneric, - "Changing whether a metadata object is generic is not allowed"); + oldIsHive == newIsHive, "Changing whether a table is Hive table is not allowed"); } private void alterTableViaProperties( @@ -1765,4 +1746,51 @@ private void alterTableViaProperties( public static boolean isEmbeddedMetastore(HiveConf hiveConf) { return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)); } + + private static Database alterDatabase(Database hiveDB, CatalogDatabase newDatabase) { + Map params = hiveDB.getParameters(); + Map newParams = newDatabase.getProperties(); + String opStr = newParams.remove(ALTER_DATABASE_OP); + if (opStr == null) { + // by default is to alter db properties + opStr = SqlAlterHiveDatabase.AlterHiveDatabaseOp.CHANGE_PROPS.name(); + } + String newLocation = newParams.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI); + SqlAlterHiveDatabase.AlterHiveDatabaseOp op = + SqlAlterHiveDatabase.AlterHiveDatabaseOp.valueOf(opStr); + switch (op) { + case CHANGE_PROPS: + if (params == null) { + hiveDB.setParameters(newParams); + } else { + params.putAll(newParams); + } + break; + case CHANGE_LOCATION: + hiveDB.setLocationUri(newLocation); + break; + case CHANGE_OWNER: + String ownerName = newParams.remove(DATABASE_OWNER_NAME); + String ownerType = newParams.remove(DATABASE_OWNER_TYPE); + hiveDB.setOwnerName(ownerName); + switch (ownerType) { + case SqlAlterHiveDatabaseOwner.ROLE_OWNER: + hiveDB.setOwnerType(PrincipalType.ROLE); + break; + case SqlAlterHiveDatabaseOwner.USER_OWNER: + hiveDB.setOwnerType(PrincipalType.USER); + break; + default: + throw new CatalogException("Unsupported database owner type: " + ownerType); + } + break; + default: + throw new CatalogException("Unsupported alter database op:" + opStr); + } + // is_generic is deprecated, remove it + if (hiveDB.getParameters() != null) { + hiveDB.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC); + } + return hiveDB; + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveDatabaseUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveDatabaseUtil.java deleted file mode 100644 index da8be13fbf64d..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveDatabaseUtil.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog.hive; - -import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase; -import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner; -import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase; -import org.apache.flink.table.catalog.CatalogDatabase; -import org.apache.flink.table.catalog.exceptions.CatalogException; - -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.PrincipalType; - -import java.util.Map; - -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE; -import static org.apache.flink.table.catalog.hive.HiveCatalog.isGenericForCreate; -import static org.apache.flink.table.catalog.hive.HiveCatalog.isGenericForGet; - -/** Util methods for processing databases in HiveCatalog. */ -public class HiveDatabaseUtil { - - private HiveDatabaseUtil() {} - - static Database instantiateHiveDatabase(String databaseName, CatalogDatabase database) { - - Map properties = database.getProperties(); - - boolean isGeneric = isGenericForCreate(properties); - - String dbLocationUri = - isGeneric ? null : properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI); - - return new Database(databaseName, database.getComment(), dbLocationUri, properties); - } - - static Database alterDatabase(Database hiveDB, CatalogDatabase newDatabase) { - Map params = hiveDB.getParameters(); - boolean isGeneric = isGenericForGet(params); - if (isGeneric) { - // altering generic DB doesn't merge properties, see CatalogTest::testAlterDb - hiveDB.setParameters(newDatabase.getProperties()); - } else { - String opStr = newDatabase.getProperties().remove(ALTER_DATABASE_OP); - if (opStr == null) { - throw new CatalogException( - ALTER_DATABASE_OP + " property is missing for alter database statement"); - } - String newLocation = - newDatabase.getProperties().remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI); - Map newParams = newDatabase.getProperties(); - SqlAlterHiveDatabase.AlterHiveDatabaseOp op = - SqlAlterHiveDatabase.AlterHiveDatabaseOp.valueOf(opStr); - switch (op) { - case CHANGE_PROPS: - if (params == null) { - hiveDB.setParameters(newParams); - } else { - params.putAll(newParams); - } - break; - case CHANGE_LOCATION: - hiveDB.setLocationUri(newLocation); - break; - case CHANGE_OWNER: - String ownerName = newParams.remove(DATABASE_OWNER_NAME); - String ownerType = newParams.remove(DATABASE_OWNER_TYPE); - hiveDB.setOwnerName(ownerName); - switch (ownerType) { - case SqlAlterHiveDatabaseOwner.ROLE_OWNER: - hiveDB.setOwnerType(PrincipalType.ROLE); - break; - case SqlAlterHiveDatabaseOwner.USER_OWNER: - hiveDB.setOwnerType(PrincipalType.USER); - break; - default: - throw new CatalogException( - "Unsupported database owner type: " + ownerType); - } - break; - default: - throw new CatalogException("Unsupported alter database op:" + opStr); - } - } - return hiveDB; - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java index 8f311c8b2130e..2e38220319b5c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java @@ -25,7 +25,6 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.ObjectPath; @@ -226,7 +225,7 @@ public static Optional makePartitionFilter( * Extract DDL semantics from properties and use it to initiate the table. The related * properties will be removed from the map after they're used. */ - public static void initiateTableFromProperties( + private static void initiateTableFromProperties( Table hiveTable, Map properties, HiveConf hiveConf) { extractExternal(hiveTable, properties); extractRowFormat(hiveTable.getSd(), properties); @@ -353,24 +352,13 @@ public static Table instantiateHiveTable( properties.put(HiveCatalogConfig.COMMENT, table.getComment()); } - boolean isGeneric = HiveCatalog.isGenericForCreate(properties); + boolean isHiveTable = HiveCatalog.isHiveTable(properties); // Hive table's StorageDescriptor StorageDescriptor sd = hiveTable.getSd(); HiveTableUtil.setDefaultStorageFormat(sd, hiveConf); - if (isGeneric) { - DescriptorProperties tableSchemaProps = new DescriptorProperties(true); - tableSchemaProps.putTableSchema(Schema.SCHEMA, table.getSchema()); - - if (table instanceof CatalogTable) { - tableSchemaProps.putPartitionKeys(((CatalogTable) table).getPartitionKeys()); - } - - properties.putAll(tableSchemaProps.asMap()); - properties = maskFlinkProperties(properties); - hiveTable.setParameters(properties); - } else { + if (isHiveTable) { HiveTableUtil.initiateTableFromProperties(hiveTable, properties, hiveConf); List allColumns = HiveTableUtil.createHiveColumns(table.getSchema()); // Table columns and partition keys @@ -396,6 +384,17 @@ public static Table instantiateHiveTable( } // Table properties hiveTable.getParameters().putAll(properties); + } else { + DescriptorProperties tableSchemaProps = new DescriptorProperties(true); + tableSchemaProps.putTableSchema(Schema.SCHEMA, table.getSchema()); + + if (table instanceof CatalogTable) { + tableSchemaProps.putPartitionKeys(((CatalogTable) table).getPartitionKeys()); + } + + properties.putAll(tableSchemaProps.asMap()); + properties = maskFlinkProperties(properties); + hiveTable.setParameters(properties); } if (table instanceof CatalogView) { @@ -413,18 +412,11 @@ public static Table instantiateHiveTable( /** * Add a prefix to Flink-created properties to distinguish them from Hive-created properties. - * Note that 'is_generic' is a special key and this method will leave it as-is. */ - public static Map maskFlinkProperties(Map properties) { + private static Map maskFlinkProperties(Map properties) { return properties.entrySet().stream() .filter(e -> e.getKey() != null && e.getValue() != null) - .map( - e -> - new Tuple2<>( - e.getKey().equals(CatalogPropertiesUtil.IS_GENERIC) - ? e.getKey() - : FLINK_PROPERTY_PREFIX + e.getKey(), - e.getValue())) + .map(e -> new Tuple2<>(FLINK_PROPERTY_PREFIX + e.getKey(), e.getValue())) .collect(Collectors.toMap(t -> t.f0, t -> t.f1)); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java index 7e8e73d2bbda5..e06d99470dede 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java @@ -21,6 +21,7 @@ import org.apache.flink.connectors.hive.FlinkHiveException; import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils; import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.AlterHiveDatabaseOp; +import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; @@ -36,7 +37,6 @@ import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionImpl; import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.CatalogView; @@ -51,6 +51,7 @@ import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.Operation; @@ -308,7 +309,7 @@ private Operation convertCreateAlterView(HiveParserCreateViewDesc desc) { props.putAll(baseTable.getOptions()); comment = baseTable.getComment(); } else { - markNonGeneric(props); + markHiveConnector(props); comment = desc.getComment(); if (desc.getTblProps() != null) { props.putAll(desc.getTblProps()); @@ -368,7 +369,7 @@ private Operation convertCreateTable(HiveParserCreateTableDesc desc) { if (desc.getTblProps() != null) { props.putAll(desc.getTblProps()); } - markNonGeneric(props); + markHiveConnector(props); // external if (desc.isExternal()) { props.put(TABLE_IS_EXTERNAL, "true"); @@ -708,7 +709,6 @@ private Operation convertCreateDatabase(CreateDatabaseDesc desc) { if (desc.getDatabaseProperties() != null) { props.putAll(desc.getDatabaseProperties()); } - markNonGeneric(props); if (desc.getLocationUri() != null) { props.put(DATABASE_LOCATION_URI, desc.getLocationUri()); } @@ -720,8 +720,8 @@ private Operation convertCreateDatabase(CreateDatabaseDesc desc) { desc.getIfNotExists()); } - private void markNonGeneric(Map props) { - props.put(CatalogPropertiesUtil.IS_GENERIC, "false"); + private void markHiveConnector(Map props) { + props.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); } private CatalogBaseTable getCatalogBaseTable(ObjectIdentifier tableIdentifier) { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java index 19cb79acf12c9..0b56b1bb84823 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java @@ -27,7 +27,6 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.HiveCatalog; @@ -149,7 +148,6 @@ public void testCreateDatabase() throws Exception { tableEnv.executeSql("create database db1 comment 'db1 comment'"); Database db = hiveCatalog.getHiveDatabase("db1"); assertEquals("db1 comment", db.getDescription()); - assertFalse(Boolean.parseBoolean(db.getParameters().get(CatalogPropertiesUtil.IS_GENERIC))); String db2Location = warehouse + "/db2_location"; tableEnv.executeSql( @@ -167,8 +165,6 @@ public void testAlterDatabase() throws Exception { tableEnv.executeSql("create database db1 with dbproperties('k1'='v1')"); tableEnv.executeSql("alter database db1 set dbproperties ('k1'='v11','k2'='v2')"); Database db = hiveCatalog.getHiveDatabase("db1"); - // there's an extra is_generic property - assertEquals(3, db.getParametersSize()); assertEquals("v11", db.getParameters().get("k1")); assertEquals("v2", db.getParameters().get("k2")); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java index 032db8d40971a..592e1d25708c1 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java @@ -19,10 +19,10 @@ package org.apache.flink.connectors.hive; import org.apache.flink.configuration.Configuration; +import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.Column; @@ -77,8 +77,7 @@ public void testGenericTable() throws Exception { .build(); Map properties = new HashMap<>(); - properties.put(CatalogPropertiesUtil.IS_GENERIC, String.valueOf(true)); - properties.put("connector", "COLLECTION"); + properties.put(FactoryUtil.CONNECTOR.key(), "COLLECTION"); catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true); ObjectPath path = new ObjectPath("mydb", "mytable"); @@ -114,7 +113,7 @@ public void testHiveTable() throws Exception { Column.physical("age", DataTypes.INT())); Map properties = new HashMap<>(); - properties.put(CatalogPropertiesUtil.IS_GENERIC, String.valueOf(false)); + properties.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true); ObjectPath path = new ObjectPath("mydb", "mytable"); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java index 9cc5bd7359e49..d5ea1be94a329 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java @@ -18,15 +18,16 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.BinaryType; @@ -199,7 +200,7 @@ private CatalogTable createCatalogTable(DataType[] types) { new HashMap() { { put("is_streaming", "false"); - put(CatalogPropertiesUtil.IS_GENERIC, String.valueOf(false)); + put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); } }, ""); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java index 7153ee723e0da..4aac2652edf10 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java @@ -40,9 +40,10 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Map; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; /** Test for HiveCatalog on generic metadata. */ public class HiveCatalogGenericMetadataTest extends HiveCatalogMetadataTestBase { @@ -96,7 +97,7 @@ public void testTableSchemaCompatibility() throws Exception { tablePath.getDatabaseName(), tablePath.getObjectName()); hiveTable.setDbName(tablePath.getDatabaseName()); hiveTable.setTableName(tablePath.getObjectName()); - hiveTable.getParameters().putAll(getBatchTableProperties()); + setLegacyGeneric(hiveTable.getParameters()); hiveTable.getParameters().put("flink.generic.table.schema.0.name", "ti"); hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "TINYINT"); hiveTable.getParameters().put("flink.generic.table.schema.1.name", "si"); @@ -118,9 +119,7 @@ public void testTableSchemaCompatibility() throws Exception { hiveTable.getParameters().put("flink.generic.table.schema.7.data-type", "DOUBLE"); ((HiveCatalog) catalog).client.createTable(hiveTable); CatalogBaseTable catalogBaseTable = catalog.getTable(tablePath); - assertTrue( - Boolean.parseBoolean( - catalogBaseTable.getOptions().get(CatalogPropertiesUtil.IS_GENERIC))); + assertFalse(HiveCatalog.isHiveTable(catalogBaseTable.getOptions())); TableSchema expectedSchema = TableSchema.builder() .fields( @@ -145,7 +144,7 @@ public void testTableSchemaCompatibility() throws Exception { tablePath.getDatabaseName(), tablePath.getObjectName()); hiveTable.setDbName(tablePath.getDatabaseName()); hiveTable.setTableName(tablePath.getObjectName()); - hiveTable.getParameters().putAll(getBatchTableProperties()); + setLegacyGeneric(hiveTable.getParameters()); hiveTable.setTableName(tablePath.getObjectName()); hiveTable.getParameters().put("flink.generic.table.schema.0.name", "c"); hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "CHAR(265)"); @@ -195,7 +194,7 @@ public void testTableSchemaCompatibility() throws Exception { tablePath.getDatabaseName(), tablePath.getObjectName()); hiveTable.setDbName(tablePath.getDatabaseName()); hiveTable.setTableName(tablePath.getObjectName()); - hiveTable.getParameters().putAll(getBatchTableProperties()); + setLegacyGeneric(hiveTable.getParameters()); hiveTable.setTableName(tablePath.getObjectName()); hiveTable.getParameters().put("flink.generic.table.schema.0.name", "dt"); hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "DATE"); @@ -241,7 +240,7 @@ public void testTableSchemaCompatibility() throws Exception { tablePath.getDatabaseName(), tablePath.getObjectName()); hiveTable.setDbName(tablePath.getDatabaseName()); hiveTable.setTableName(tablePath.getObjectName()); - hiveTable.getParameters().putAll(getBatchTableProperties()); + setLegacyGeneric(hiveTable.getParameters()); hiveTable.setTableName(tablePath.getObjectName()); hiveTable.getParameters().put("flink.generic.table.schema.0.name", "a"); hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "ARRAY"); @@ -456,4 +455,8 @@ protected CatalogFunction createAnotherFunction() { return new CatalogFunctionImpl( TestSimpleUDF.class.getCanonicalName(), FunctionLanguage.SCALA); } + + private static void setLegacyGeneric(Map properties) { + properties.put(CatalogPropertiesUtil.IS_GENERIC, "true"); + } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java index 1273484292a8d..b8ee7be5757df 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java @@ -18,18 +18,16 @@ package org.apache.flink.table.catalog.hive; -import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.AlterHiveDatabaseOp; import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable; +import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; import org.apache.flink.table.HiveVersionTestUtil; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.constraints.UniqueConstraint; -import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogFunctionImpl; import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.CatalogTestUtil; @@ -44,6 +42,7 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.catalog.stats.Date; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; import org.apache.flink.util.StringUtils; @@ -59,7 +58,6 @@ import java.util.HashMap; import java.util.Map; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -80,27 +78,6 @@ public static void init() { public void testCreateTable_Streaming() throws Exception {} - @Override - @Test - public void testAlterDb() throws Exception { - // altering Hive DB merges properties, which is different from generic DB - CatalogDatabase db = createDb(); - catalog.createDatabase(db1, db, false); - - CatalogDatabase newDb = createAnotherDb(); - newDb.getProperties().put(ALTER_DATABASE_OP, AlterHiveDatabaseOp.CHANGE_PROPS.name()); - catalog.alterDatabase(db1, newDb, false); - - Map mergedProps = new HashMap<>(db.getProperties()); - mergedProps.putAll(newDb.getProperties()); - - assertTrue( - catalog.getDatabase(db1) - .getProperties() - .entrySet() - .containsAll(mergedProps.entrySet())); - } - @Test // verifies that input/output formats and SerDe are set for Hive tables public void testCreateTable_StorageFormatSet() throws Exception { @@ -266,7 +243,7 @@ private void checkStatistics(int inputStat, int expectStat) throws Exception { catalog.dropTable(path1, true); Map properties = new HashMap<>(); - properties.put(CatalogPropertiesUtil.IS_GENERIC, "false"); + properties.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); properties.put(StatsSetupConst.ROW_COUNT, String.valueOf(inputStat)); properties.put(StatsSetupConst.NUM_FILES, String.valueOf(inputStat)); properties.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(inputStat)); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java index cdc5d1048baf7..6cd1406b07af7 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java @@ -27,6 +27,8 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.constraints.UniqueConstraint; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableBuilder; import org.apache.flink.table.catalog.ObjectPath; @@ -34,6 +36,7 @@ import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.FormatDescriptor; import org.apache.flink.table.descriptors.OldCsv; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; @@ -483,4 +486,26 @@ public void testTemporaryGenericTable() throws Exception { "create temporary table blackhole(i int) with ('connector'='blackhole')"); tableEnv.executeSql("insert into blackhole select * from datagen").await(); } + + @Test + public void testCreateTableLike() throws Exception { + TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); + tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tableEnv.useCatalog(hiveCatalog.getName()); + tableEnv.executeSql("create table generic_table (x int) with ('connector'='COLLECTION')"); + tableEnv.useCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG); + tableEnv.executeSql( + String.format( + "create table copy like `%s`.`default`.generic_table", + hiveCatalog.getName())); + Catalog builtInCat = tableEnv.getCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG).get(); + CatalogBaseTable catalogTable = + builtInCat.getTable( + new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "copy")); + assertEquals(1, catalogTable.getOptions().size()); + assertEquals("COLLECTION", catalogTable.getOptions().get(FactoryUtil.CONNECTOR.key())); + assertEquals(1, catalogTable.getSchema().getFieldCount()); + assertEquals("x", catalogTable.getSchema().getFieldNames()[0]); + assertEquals(DataTypes.INT(), catalogTable.getSchema().getFieldDataTypes()[0]); + } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java index 5bb722104f6db..056036427ba27 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -26,6 +27,7 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.table.descriptors.FileSystem; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.AfterClass; @@ -35,7 +37,9 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** Test for HiveCatalog. */ @@ -71,7 +75,7 @@ schema, new FileSystem().path("/test_path").toProperties(), null), HiveTestUtils.createHiveConf()); Map prop = hiveTable.getParameters(); - assertEquals(prop.remove(CatalogPropertiesUtil.IS_GENERIC), String.valueOf("true")); + assertFalse(HiveCatalog.isHiveTable(prop)); assertTrue( prop.keySet().stream() .allMatch(k -> k.startsWith(CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX))); @@ -81,7 +85,7 @@ schema, new FileSystem().path("/test_path").toProperties(), null), public void testCreateHiveTable() { Map map = new HashMap<>(new FileSystem().path("/test_path").toProperties()); - map.put(CatalogPropertiesUtil.IS_GENERIC, String.valueOf(false)); + map.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); Table hiveTable = HiveTableUtil.instantiateHiveTable( @@ -90,7 +94,7 @@ public void testCreateHiveTable() { HiveTestUtils.createHiveConf()); Map prop = hiveTable.getParameters(); - assertEquals(prop.remove(CatalogPropertiesUtil.IS_GENERIC), String.valueOf(false)); + assertTrue(HiveCatalog.isHiveTable(prop)); assertTrue( prop.keySet().stream() .noneMatch(k -> k.startsWith(CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX))); @@ -104,7 +108,7 @@ public void testRetrieveFlinkProperties() throws Exception { Map properties = new HashMap<>(new FileSystem().path("/test_path").toProperties()); - properties.put(CatalogPropertiesUtil.IS_GENERIC, String.valueOf(true)); + properties.put(CONNECTOR.key(), "jdbc"); properties.put("url", "jdbc:clickhouse://host:port/testUrl1"); properties.put("flink.url", "jdbc:clickhouse://host:port/testUrl2"); diff --git a/flink-python/pyflink/table/tests/test_catalog.py b/flink-python/pyflink/table/tests/test_catalog.py index dcc1bc7d9cea2..f6f65013e6808 100644 --- a/flink-python/pyflink/table/tests/test_catalog.py +++ b/flink-python/pyflink/table/tests/test_catalog.py @@ -258,10 +258,10 @@ def test_alter_db(self): new_db = self.create_another_db() self.catalog.alter_database(self.db1, new_db, False) + merged_properties = db.get_properties().copy() + merged_properties.update(new_db.get_properties()) new_properties = self.catalog.get_database(self.db1).get_properties() - old_properties = db.get_properties() - self.assertFalse(all(k in new_properties for k in old_properties.keys())) - self.check_catalog_database_equals(new_db, self.catalog.get_database(self.db1)) + self.assertTrue(all(kv in new_properties.items() for kv in merged_properties.items())) def test_alter_db_database_not_exist_exception(self): with self.assertRaises(DatabaseNotExistException): diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java index 3e538f559b28d..144e6ea70505f 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java @@ -22,12 +22,12 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.CommonCatalogOptions; @@ -50,6 +50,7 @@ import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase; import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase; import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.ModuleFactory; import org.apache.flink.table.module.Module; import org.apache.flink.table.operations.Operation; @@ -313,7 +314,7 @@ public Catalog createCatalog(Context context) { private ResolvedCatalogTable createResolvedTable( String[] fieldNames, DataType[] fieldDataTypes) { final Map options = new HashMap<>(); - options.put(CatalogPropertiesUtil.IS_GENERIC, String.valueOf(false)); + options.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); final CatalogTable origin = CatalogTable.of( Schema.newBuilder().fromFields(fieldNames, fieldDataTypes).build(), diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java index 61d9c38fd247c..d3a8e3656d5f1 100644 --- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java @@ -21,7 +21,6 @@ import org.apache.flink.sql.parser.ddl.SqlCreateDatabase; import org.apache.flink.sql.parser.ddl.SqlTableOption; import org.apache.flink.sql.parser.hive.impl.ParseException; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.calcite.sql.SqlCharStringLiteral; import org.apache.calcite.sql.SqlIdentifier; @@ -53,11 +52,7 @@ public SqlCreateHiveDatabase( HiveDDLUtils.checkReservedDBProperties(propertyList), HiveDDLUtils.unescapeStringLiteral(comment), ifNotExists); - HiveDDLUtils.ensureNonGeneric(propertyList); originPropList = new SqlNodeList(propertyList.getList(), propertyList.getParserPosition()); - // mark it as a hive database - propertyList.add( - HiveDDLUtils.toTableOption(CatalogPropertiesUtil.IS_GENERIC, "false", pos)); if (location != null) { propertyList.add( new SqlTableOption( diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java index ce3683bf82801..64420a382fa56 100644 --- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java @@ -24,7 +24,7 @@ import org.apache.flink.sql.parser.ddl.SqlTableOption; import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; import org.apache.flink.sql.parser.hive.impl.ParseException; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.calcite.sql.SqlCharStringLiteral; import org.apache.calcite.sql.SqlIdentifier; @@ -43,6 +43,8 @@ /** CREATE Table DDL for Hive dialect. */ public class SqlCreateHiveTable extends SqlCreateTable { + public static final String IDENTIFIER = "hive"; + public static final String TABLE_LOCATION_URI = "hive.location-uri"; public static final String TABLE_IS_EXTERNAL = "hive.is-external"; public static final String PK_CONSTRAINT_TRAIT = "hive.pk.constraint.trait"; @@ -96,9 +98,7 @@ public SqlCreateHiveTable( HiveDDLUtils.convertDataTypes(partColList); originPropList = new SqlNodeList(propertyList.getList(), propertyList.getParserPosition()); // mark it as a hive table - HiveDDLUtils.ensureNonGeneric(propertyList); - propertyList.add( - HiveDDLUtils.toTableOption(CatalogPropertiesUtil.IS_GENERIC, "false", pos)); + propertyList.add(HiveDDLUtils.toTableOption(FactoryUtil.CONNECTOR.key(), IDENTIFIER, pos)); // set external this.isExternal = isExternal; if (isExternal) { diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveView.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveView.java index 0d9771961af61..a366b4d641792 100644 --- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveView.java +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveView.java @@ -19,7 +19,7 @@ package org.apache.flink.sql.parser.hive.ddl; import org.apache.flink.sql.parser.ddl.SqlCreateView; -import org.apache.flink.table.catalog.CatalogPropertiesUtil; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.calcite.sql.SqlCharStringLiteral; import org.apache.calcite.sql.SqlIdentifier; @@ -59,7 +59,9 @@ public SqlCreateHiveView( HiveDDLUtils.unescapeProperties(properties); originPropList = new SqlNodeList(properties.getList(), properties.getParserPosition()); // mark it as a hive view - properties.add(HiveDDLUtils.toTableOption(CatalogPropertiesUtil.IS_GENERIC, "false", pos)); + properties.add( + HiveDDLUtils.toTableOption( + FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER, pos)); } @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogDatabaseImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogDatabaseImpl.java index d7bce0c9ff90d..39aac2529204e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogDatabaseImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogDatabaseImpl.java @@ -58,6 +58,11 @@ public String getComment() { * @return a copy of CatalogDatabase instance */ public CatalogDatabase copy() { + return copy(getProperties()); + } + + @Override + public CatalogDatabase copy(Map properties) { return new CatalogDatabaseImpl(new HashMap<>(properties), comment); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java index 33ec94e93c272..d7428b88cffc6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java @@ -63,8 +63,6 @@ public final class CatalogTableBuilder extends TableDescriptor partitionKeys = new ArrayList<>(); private Map properties = Collections.emptyMap(); @@ -72,9 +70,6 @@ public final class CatalogTableBuilder extends TableDescriptor additionalProperties() { DescriptorProperties descriptorProperties = new DescriptorProperties(); - descriptorProperties.putBoolean(CatalogPropertiesUtil.IS_GENERIC, isGeneric); - descriptorProperties.putProperties(this.properties); return descriptorProperties.asMap(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java index 2ae22d852612b..e10fc1edfff6e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java @@ -82,7 +82,6 @@ public Map toProperties() { descriptor.putPartitionKeys(getPartitionKeys()); Map properties = new HashMap<>(getOptions()); - properties.remove(CatalogPropertiesUtil.IS_GENERIC); descriptor.putProperties(properties); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java index 9de620f1cdfeb..32e0561eff5aa 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java @@ -174,7 +174,9 @@ public void alterDatabase( newDatabase.getClass().getName())); } - databases.put(databaseName, newDatabase.copy()); + Map mergedProperties = new HashMap<>(existingDatabase.getProperties()); + mergedProperties.putAll(newDatabase.getProperties()); + databases.put(databaseName, newDatabase.copy(mergedProperties)); } else if (!ignoreIfNotExists) { throw new DatabaseNotExistException(getName(), databaseName); } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java index 4a252143c8fca..a588343749c15 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.table.api.Schema; +import org.apache.flink.table.factories.FactoryUtil; import java.util.Collections; import java.util.HashMap; @@ -33,7 +34,6 @@ public CatalogDatabase createDb() { new HashMap() { { put("k1", "v1"); - putAll(getGenericFlag(isGeneric())); } }, TEST_COMMENT); @@ -45,7 +45,6 @@ public CatalogDatabase createAnotherDb() { new HashMap() { { put("k2", "v2"); - putAll(getGenericFlag(isGeneric())); } }, TEST_COMMENT); @@ -165,7 +164,8 @@ protected Map getStreamingTableProperties() { private Map getGenericFlag(boolean isGeneric) { return new HashMap() { { - put(CatalogPropertiesUtil.IS_GENERIC, String.valueOf(isGeneric)); + String connector = isGeneric ? "COLLECTION" : "hive"; + put(FactoryUtil.CONNECTOR.key(), connector); } }; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java index e46406e539010..4265a8358d490 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java @@ -40,6 +40,13 @@ public interface CatalogDatabase { */ CatalogDatabase copy(); + /** + * Returns a copy of this {@code CatalogDatabase} with the given properties. + * + * @return a new copy of this database with replaced properties + */ + CatalogDatabase copy(Map properties); + /** * Get a brief description of the database. * diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java index 1fc421fbaa0e9..4c968eab39229 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java @@ -203,12 +203,14 @@ public void testAlterDb() throws Exception { CatalogDatabase newDb = createAnotherDb(); catalog.alterDatabase(db1, newDb, false); - assertFalse( + Map mergedProps = new HashMap<>(db.getProperties()); + mergedProps.putAll(newDb.getProperties()); + + assertTrue( catalog.getDatabase(db1) .getProperties() .entrySet() - .containsAll(db.getProperties().entrySet())); - CatalogTestUtil.checkEquals(newDb, catalog.getDatabase(db1)); + .containsAll(mergedProps.entrySet())); } @Test diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java index de42355517af6..9675e622bb614 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java @@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.catalog.stats.Date; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.plan.stats.TableStats; import java.util.Map; @@ -47,19 +48,15 @@ public static void checkEquals(CatalogTable t1, CatalogTable t2) { assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys()); assertEquals(t1.isPartitioned(), t2.isPartitioned()); - assertEquals( - t1.getOptions().get(CatalogPropertiesUtil.IS_GENERIC), - t2.getOptions().get(CatalogPropertiesUtil.IS_GENERIC)); - // Hive tables may have properties created by itself // thus properties of Hive table is a super set of those in its corresponding Flink table - if (Boolean.parseBoolean(t1.getOptions().get(CatalogPropertiesUtil.IS_GENERIC))) { - assertEquals(t1.getOptions(), t2.getOptions()); - } else { + if (isHiveTable(t1.getOptions())) { assertTrue( t2.getOptions().keySet().stream() .noneMatch(k -> k.startsWith(FLINK_PROPERTY_PREFIX))); assertTrue(t2.getOptions().entrySet().containsAll(t1.getOptions().entrySet())); + } else { + assertEquals(t1.getOptions(), t2.getOptions()); } } @@ -72,13 +69,13 @@ public static void checkEquals(CatalogView v1, CatalogView v2) { // Hive tables may have properties created by itself // thus properties of Hive table is a super set of those in its corresponding Flink table - if (Boolean.parseBoolean(v1.getOptions().get(CatalogPropertiesUtil.IS_GENERIC))) { - assertEquals(v1.getOptions(), v2.getOptions()); - } else { + if (isHiveTable(v1.getOptions())) { assertTrue( v2.getOptions().keySet().stream() .noneMatch(k -> k.startsWith(FLINK_PROPERTY_PREFIX))); assertTrue(v2.getOptions().entrySet().containsAll(v1.getOptions().entrySet())); + } else { + assertEquals(v1.getOptions(), v2.getOptions()); } } @@ -88,10 +85,10 @@ public static void checkEquals(CatalogPartition p1, CatalogPartition p2) { // Hive tables may have properties created by itself // thus properties of Hive table is a super set of those in its corresponding Flink table - if (Boolean.valueOf(p1.getProperties().get(CatalogPropertiesUtil.IS_GENERIC))) { - assertEquals(p1.getProperties(), p2.getProperties()); - } else { + if (isHiveTable(p1.getProperties())) { assertTrue(p2.getProperties().entrySet().containsAll(p1.getProperties().entrySet())); + } else { + assertEquals(p1.getProperties(), p2.getProperties()); } } @@ -209,4 +206,8 @@ private static void checkEquals( private static void checkEquals(Date v1, Date v2) { assertEquals(v1.getDaysSinceEpoch(), v2.getDaysSinceEpoch()); } + + private static boolean isHiveTable(Map properties) { + return "hive".equalsIgnoreCase(properties.get(FactoryUtil.CONNECTOR.key())); + } }