Skip to content

Commit

Permalink
[FLINK-21660][hive] Stop using is_generic to differentiate hive and f…
Browse files Browse the repository at this point in the history
…link tables (apache#15155)
  • Loading branch information
lirui-apache committed Apr 15, 2021
1 parent 9fd6ecf commit 5b9e788
Show file tree
Hide file tree
Showing 26 changed files with 286 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
Expand All @@ -35,11 +33,8 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.table.catalog.CatalogPropertiesUtil.IS_GENERIC;
import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_ENABLE;
import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;

Expand Down Expand Up @@ -67,27 +62,12 @@ public Set<ConfigOption<?>> optionalOptions() {
throw new UnsupportedOperationException("Hive factory is only work for catalog.");
}

private static ResolvedCatalogTable removeIsGenericFlag(Context context) {
Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
boolean isGeneric = Boolean.parseBoolean(newOptions.remove(IS_GENERIC));
// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
throw new ValidationException(
"Hive dynamic table factory now only work for generic table.");
}
return context.getCatalogTable().copy(newOptions);
}

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
boolean isGeneric =
Boolean.parseBoolean(
context.getCatalogTable()
.getOptions()
.get(CatalogPropertiesUtil.IS_GENERIC));

// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());

// we don't support temporary hive tables yet
if (isHiveTable && !context.isTemporary()) {
Integer configuredParallelism =
Configuration.fromMap(context.getCatalogTable().getOptions())
.get(FileSystemOptions.SINK_PARALLELISM);
Expand All @@ -101,7 +81,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
return FactoryUtil.createTableSink(
null, // we already in the factory of catalog
context.getObjectIdentifier(),
removeIsGenericFlag(context),
context.getCatalogTable(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
Expand All @@ -110,14 +90,10 @@ public DynamicTableSink createDynamicTableSink(Context context) {

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
boolean isGeneric =
Boolean.parseBoolean(
context.getCatalogTable()
.getOptions()
.get(CatalogPropertiesUtil.IS_GENERIC));

// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());

// we don't support temporary hive tables yet
if (isHiveTable && !context.isTemporary()) {
CatalogTable catalogTable = Preconditions.checkNotNull(context.getCatalogTable());

boolean isStreamingSource =
Expand Down Expand Up @@ -158,7 +134,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
return FactoryUtil.createTableSource(
null, // we already in the factory of catalog
context.getObjectIdentifier(),
removeIsGenericFlag(context),
context.getCatalogTable(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.flink.connectors.hive;

import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
Expand Down Expand Up @@ -51,13 +51,12 @@ public TableSource createTableSource(TableSourceFactory.Context context) {
CatalogTable table = checkNotNull(context.getTable());
Preconditions.checkArgument(table instanceof CatalogTableImpl);

boolean isGeneric =
Boolean.parseBoolean(table.getOptions().get(CatalogPropertiesUtil.IS_GENERIC));
boolean isHiveTable = HiveCatalog.isHiveTable(table.getOptions());

// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
// we don't support temporary hive tables yet
if (isHiveTable && !context.isTemporary()) {
throw new UnsupportedOperationException(
"Hive table should be resolved by HiveDynamicTableFactory.");
"Legacy TableSource for Hive is deprecated. Hive table source should be created by HiveDynamicTableFactory.");
} else {
return TableFactoryUtil.findAndCreateTableSource(context);
}
Expand All @@ -68,13 +67,12 @@ public TableSink createTableSink(TableSinkFactory.Context context) {
CatalogTable table = checkNotNull(context.getTable());
Preconditions.checkArgument(table instanceof CatalogTableImpl);

boolean isGeneric =
Boolean.parseBoolean(table.getOptions().get(CatalogPropertiesUtil.IS_GENERIC));
boolean isHiveTable = HiveCatalog.isHiveTable(table.getOptions());

// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
// we don't support temporary hive tables yet
if (isHiveTable && !context.isTemporary()) {
throw new UnsupportedOperationException(
"Hive table should be resolved by HiveDynamicTableFactory.");
"Legacy TableSink for Hive is deprecated. Hive table sink should be created by HiveDynamicTableFactory.");
} else {
return TableFactoryUtil.findAndCreateTableSink(context);
}
Expand Down
Loading

0 comments on commit 5b9e788

Please sign in to comment.