Skip to content

Commit

Permalink
[FLINK-15912][table] Support create table source/sink by context in h…
Browse files Browse the repository at this point in the history
…ive connector
  • Loading branch information
JingsongLi authored and wuchong committed Feb 22, 2020
1 parent e280ffc commit 4e92bdb
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,16 @@ public List<String> supportedProperties() {
}

@Override
public TableSink<Row> createTableSink(Map<String, String> properties) {
throw new UnsupportedOperationException();
}

@Override
public TableSource<BaseRow> createTableSource(Map properties) {
throw new UnsupportedOperationException();
}

@Override
public TableSource<BaseRow> createTableSource(ObjectPath tablePath, CatalogTable table) {
Preconditions.checkNotNull(table);
public TableSource<BaseRow> createTableSource(TableSourceFactory.Context context) {
CatalogTable table = checkNotNull(context.getTable());
Preconditions.checkArgument(table instanceof CatalogTableImpl);

boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));

if (!isGeneric) {
return createHiveTableSource(tablePath, table);
return createHiveTableSource(context.getObjectIdentifier().toObjectPath(), table);
} else {
return TableFactoryUtil.findAndCreateTableSource(table);
return TableFactoryUtil.findAndCreateTableSource(context);
}
}

Expand All @@ -95,16 +85,16 @@ private StreamTableSource<BaseRow> createHiveTableSource(ObjectPath tablePath, C
}

@Override
public TableSink<Row> createTableSink(ObjectPath tablePath, CatalogTable table) {
Preconditions.checkNotNull(table);
public TableSink<Row> createTableSink(TableSinkFactory.Context context) {
CatalogTable table = checkNotNull(context.getTable());
Preconditions.checkArgument(table instanceof CatalogTableImpl);

boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));

if (!isGeneric) {
return createOutputFormatTableSink(tablePath, table);
return createOutputFormatTableSink(context.getObjectIdentifier().toObjectPath(), table);
} else {
return TableFactoryUtil.findAndCreateTableSink(table);
return TableFactoryUtil.findAndCreateTableSink(context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@

package org.apache.flink.connectors.hive;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.StreamTableSource;
Expand Down Expand Up @@ -78,9 +82,11 @@ public void testGenericTable() throws Exception {
Optional<TableFactory> opt = catalog.getTableFactory();
assertTrue(opt.isPresent());
HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
TableSource tableSource = tableFactory.createTableSource(path, table);
TableSource tableSource = tableFactory.createTableSource(new TableSourceFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
assertTrue(tableSource instanceof StreamTableSource);
TableSink tableSink = tableFactory.createTableSink(path, table);
TableSink tableSink = tableFactory.createTableSink(new TableSinkFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
assertTrue(tableSink instanceof StreamTableSink);
}

Expand All @@ -100,9 +106,11 @@ public void testHiveTable() throws Exception {
Optional<TableFactory> opt = catalog.getTableFactory();
assertTrue(opt.isPresent());
HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
TableSink tableSink = tableFactory.createTableSink(path, table);
TableSink tableSink = tableFactory.createTableSink(new TableSinkFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
assertTrue(tableSink instanceof HiveTableSink);
TableSource tableSource = tableFactory.createTableSource(path, table);
TableSource tableSource = tableFactory.createTableSource(new TableSourceFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
assertTrue(tableSource instanceof HiveTableSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.utils.TableTestUtil;
Expand Down Expand Up @@ -447,7 +448,7 @@ private void testVectorReader(boolean fallback) throws Exception {

HiveTableFactory tableFactorySpy = spy((HiveTableFactory) hiveCatalog.getTableFactory().get());
doReturn(new TestVectorReaderSource(new JobConf(hiveCatalog.getHiveConf()), tablePath, catalogTable))
.when(tableFactorySpy).createTableSource(any(ObjectPath.class), any(CatalogTable.class));
.when(tableFactorySpy).createTableSource(any(TableSourceFactory.Context.class));
HiveCatalog catalogSpy = spy(hiveCatalog);
doReturn(Optional.of(tableFactorySpy)).when(catalogSpy).getTableFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,6 @@ private static <T> TableSink<T> findAndCreateTableSink(Map<String, String> prope
return tableSink;
}

/**
* Returns a table sink matching the {@link org.apache.flink.table.catalog.CatalogTable}.
*/
public static <T> TableSink<T> findAndCreateTableSink(CatalogTable table) {
return findAndCreateTableSink(table.toProperties());
}

/**
* Returns a table source matching the {@link org.apache.flink.table.catalog.CatalogTable}.
*/
public static <T> TableSource<T> findAndCreateTableSource(CatalogTable table) {
return findAndCreateTableSource(table.toProperties());
}

/**
* Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
*/
Expand Down

0 comments on commit 4e92bdb

Please sign in to comment.