Skip to content

Commit

Permalink
[FLINK-17748][table] Remove registration of TableSource/TableSink in …
Browse files Browse the repository at this point in the history
…table env

The old TableSource/TableSink interface will be replaced by FLIP-95 in the future, thus
we choose a more lightweight solution to move the registration from TableEnvironement
to TableEnvironmentInternal, keep these methods from users but still avaliable to our
codebase.

After FLIP-95 is done, we should continue to improve table factory and related descriptor API
to make it easy and intuitive for users to register user defined table (source & sink).

This closes apache#12076
  • Loading branch information
docete authored and KurtYoung committed May 16, 2020
1 parent da16f9e commit 32b79d1
Show file tree
Hide file tree
Showing 83 changed files with 659 additions and 600 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.testutils.junit.FailsOnJava11;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -463,7 +464,7 @@ public void testCassandraTableSink() throws Exception {
DataStreamSource<Row> source = env.fromCollection(rowCollection);

tEnv.createTemporaryView("testFlinkTable", source);
tEnv.registerTableSink(
((TableEnvironmentInternal) tEnv).registerTableSinkInternal(
"cassandraTable",
new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY)).configure(
new String[]{"f0", "f1", "f2"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* hSrc.addColumn("fam1", "col2", Integer.class);
* hSrc.addColumn("fam2", "col1", String.class);
*
* tableEnv.registerTableSource("hTable", hSrc);
* tableEnv.registerTableSourceInternal("hTable", hSrc);
* Table res = tableEnv.sqlQuery(
* "SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t " +
* "WHERE t.rowkey LIKE 'flink%' GROUP BY t.fam2.col1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void testTableSourceFullScan() throws Exception {
hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
tEnv.registerTableSource("hTable", hbaseTable);
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable);

Table table = tEnv.sqlQuery("SELECT " +
" h.family1.col1, " +
Expand Down Expand Up @@ -135,7 +136,7 @@ public void testTableSourceProjection() throws Exception {
hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
tEnv.registerTableSource("hTable", hbaseTable);
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable);

Table table = tEnv.sqlQuery("SELECT " +
" h.family1.col1, " +
Expand Down Expand Up @@ -169,7 +170,7 @@ public void testTableSourceFieldOrder() throws Exception {
hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
tEnv.registerTableSource("hTable", hbaseTable);
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable);

Table table = tEnv.sqlQuery("SELECT * FROM hTable AS h");

Expand All @@ -194,7 +195,7 @@ public void testTableSourceReadAsByteArray() throws Exception {
HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class);
hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class);
tEnv.registerTableSource("hTable", hbaseTable);
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable);
tEnv.registerFunction("toUTF8", new ToUTF8());
tEnv.registerFunction("toLong", new ToLong());

Expand Down Expand Up @@ -293,7 +294,7 @@ public void testTableSink() throws Exception {

DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
tEnv.createTemporaryView("src", ds);
tEnv.registerTableSink("hbase", tableSink);
((TableEnvironmentInternal) tEnv).registerTableSinkInternal("hbase", tableSink);

String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
TableEnvUtil.execInsertSqlAndWaitResult(tEnv, query);
Expand All @@ -310,7 +311,7 @@ public void testTableSink() throws Exception {
hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
batchTableEnv.registerTableSource("hTable", hbaseTable);
((TableEnvironmentInternal) batchTableEnv).registerTableSourceInternal("hTable", hbaseTable);

Table table = batchTableEnv.sqlQuery(
"SELECT " +
Expand Down Expand Up @@ -475,7 +476,7 @@ public void testHBaseLookupTableSource() throws Exception {
TableSource<?> source = TableFactoryService
.find(HBaseTableFactory.class, tableProperties)
.createTableSource(tableProperties);
streamTableEnv.registerTableSource("hbaseLookup", source);
((TableEnvironmentInternal) streamTableEnv).registerTableSourceInternal("hbaseLookup", source);
// perform a temporal table join query
String query = "SELECT a,family1.col1, family3.col3 FROM src " +
"JOIN hbaseLookup FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rk";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sources.InputFormatTableSource;
import org.apache.flink.table.types.DataType;
Expand Down Expand Up @@ -61,9 +62,9 @@ public static void main(String[] args) throws Exception {
.inBatchMode()
.build());

tEnv.registerTableSource("table1", new GeneratorTableSource(10, 100, 60, 0));
tEnv.registerTableSource("table2", new GeneratorTableSource(5, 0.2f, 60, 5));
tEnv.registerTableSink("sinkTable",
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0));
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5));
((TableEnvironmentInternal) tEnv).registerTableSinkInternal("sinkTable",
new CsvTableSink(outputPath)
.configure(new String[]{"f0", "f1"}, new TypeInformation[]{Types.INT, Types.SQL_TIMESTAMP}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
Expand Down Expand Up @@ -108,8 +109,8 @@ public static void main(String[] args) throws Exception {

final StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, settings);

tEnv.registerTableSource("table1", new GeneratorTableSource(10, 100, 60, 0));
tEnv.registerTableSource("table2", new GeneratorTableSource(5, 0.2f, 60, 5));
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0));
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5));

int overWindowSizeSeconds = 1;
int tumbleWindowSizeSeconds = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.sinks.CsvTableSink;
Expand Down Expand Up @@ -92,7 +93,7 @@ public static void main(String[] args) throws Exception {

//register sink table
String sinkTableName = QUERY_PREFIX + queryId + "_sinkTable";
tableEnvironment.registerTableSink(sinkTableName,
((TableEnvironmentInternal) tableEnvironment).registerTableSinkInternal(sinkTableName,
new CsvTableSink(
sinkTablePath + FILE_SEPARATOR + queryId + RESULT_SUFFIX,
COL_DELIMITER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
* .forOrcSchema("struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>")
* .build();
*
* tEnv.registerTableSource("orcTable", orcSrc);
* tEnv.registerTableSourceInternal("orcTable", orcSrc);
* Table res = tableEnv.sqlQuery("SELECT * FROM orcTable");
* }
* </pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -54,7 +55,7 @@ public void testFullScan() throws Exception {
.path(getPath(TEST_FILE_FLAT))
.forOrcSchema(TEST_SCHEMA_FLAT)
.build();
tEnv.registerTableSource("OrcTable", orc);
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("OrcTable", orc);

String query =
"SELECT COUNT(*), " +
Expand Down Expand Up @@ -89,7 +90,7 @@ public void testScanWithProjectionAndFilter() throws Exception {
.path(getPath(TEST_FILE_FLAT))
.forOrcSchema(TEST_SCHEMA_FLAT)
.build();
tEnv.registerTableSource("OrcTable", orc);
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("OrcTable", orc);

String query =
"SELECT " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
* .schema(messageType)
* .build();
*
* tEnv.registerTableSource("parquetTable", parquetSrc);
* tEnv.registerTableSourceInternal("parquetTable", parquetSrc);
* Table res = tableEnv.sqlQuery("SELECT * FROM parquetTable");
* }
* </pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.utils.TestUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void testFullScan() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
ParquetTableSource tableSource = createParquetTableSource(testPath);
batchTableEnvironment.registerTableSource("ParquetTable", tableSource);
((TableEnvironmentInternal) batchTableEnvironment).registerTableSourceInternal("ParquetTable", tableSource);
String query =
"SELECT foo " +
"FROM ParquetTable";
Expand All @@ -81,7 +82,7 @@ public void testScanWithProjectionAndFilter() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
ParquetTableSource tableSource = createParquetTableSource(testPath);
batchTableEnvironment.registerTableSource("ParquetTable", tableSource);
((TableEnvironmentInternal) batchTableEnvironment).registerTableSourceInternal("ParquetTable", tableSource);
String query =
"SELECT foo " +
"FROM ParquetTable WHERE foo >= 1 AND bar.spam >= 30 AND CARDINALITY(arr) >= 1 AND arr[1] <= 50";
Expand Down
4 changes: 2 additions & 2 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def register_table_source(self, name, table_source):
.. note:: Deprecated in 1.10. Use :func:`connect` instead.
"""
warnings.warn("Deprecated in 1.10. Use connect instead.", DeprecationWarning)
self._j_tenv.registerTableSource(name, table_source._j_table_source)
self._j_tenv.registerTableSourceInternal(name, table_source._j_table_source)

def register_table_sink(self, name, table_sink):
"""
Expand All @@ -215,7 +215,7 @@ def register_table_sink(self, name, table_sink):
.. note:: Deprecated in 1.10. Use :func:`connect` instead.
"""
warnings.warn("Deprecated in 1.10. Use connect instead.", DeprecationWarning)
self._j_tenv.registerTableSink(name, table_sink._j_table_sink)
self._j_tenv.registerTableSinkInternal(name, table_sink._j_table_sink)

def scan(self, *table_path):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl;
Expand Down Expand Up @@ -581,9 +582,9 @@ private void initializeCatalogs() {
}
});
// register table sources
tableSources.forEach(tableEnv::registerTableSource);
tableSources.forEach(((TableEnvironmentInternal) tableEnv)::registerTableSourceInternal);
// register table sinks
tableSinks.forEach(tableEnv::registerTableSink);
tableSinks.forEach(((TableEnvironmentInternal) tableEnv)::registerTableSinkInternal);

//--------------------------------------------------------------------------------------------------------------
// Step.4 Register temporal tables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Environment;
Expand Down Expand Up @@ -627,7 +628,7 @@ private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionCon
try {
// writing to a sink requires an optimization step that might reference UDFs during code compilation
context.wrapClassLoader(() -> {
context.getTableEnvironment().registerTableSink(tableName, result.getTableSink());
((TableEnvironmentInternal) context.getTableEnvironment()).registerTableSinkInternal(tableName, result.getTableSink());
table.insertInto(tableName);
});
pipeline = context.createPipeline(jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void testTemporalTables() throws Exception {
new String[]{"integerField", "stringField", "rowtimeField", "integerField0", "stringField0", "rowtimeField0"},
tableEnv.from("TemporalTableUsage").getSchema().getFieldNames());

// Please delete this test after removing registerTableSource in SQL-CLI.
// Please delete this test after removing registerTableSourceInternal in SQL-CLI.
TableSchema tableSchema = tableEnv.from("EnrichmentSource").getSchema();
LogicalType timestampType = tableSchema.getFieldDataTypes()[2].getLogicalType();
assertTrue(timestampType instanceof TimestampType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
Expand Down Expand Up @@ -545,55 +544,6 @@ default Table fromValues(DataType rowType, Object... values) {
*/
void createTemporaryView(String path, Table view);

/**
* Registers an external {@link TableSource} in this {@link TableEnvironment}'s catalog.
* Registered tables can be referenced in SQL queries.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again one can drop the
* corresponding temporary object.
*
* @param name The name under which the {@link TableSource} is registered.
* @param tableSource The {@link TableSource} to register.
* @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
*/
@Deprecated
void registerTableSource(String name, TableSource<?> tableSource);

/**
* Registers an external {@link TableSink} with given field names and types in this
* {@link TableEnvironment}'s catalog.
* Registered sink tables can be referenced in SQL DML statements.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again one can drop the
* corresponding temporary object.
*
* @param name The name under which the {@link TableSink} is registered.
* @param fieldNames The field names to register with the {@link TableSink}.
* @param fieldTypes The field types to register with the {@link TableSink}.
* @param tableSink The {@link TableSink} to register.
* @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
*/
@Deprecated
void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes, TableSink<?> tableSink);

/**
* Registers an external {@link TableSink} with already configured field names and field types in
* this {@link TableEnvironment}'s catalog.
* Registered sink tables can be referenced in SQL DML statements.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again one can drop the
* corresponding temporary object.
*
* @param name The name under which the {@link TableSink} is registered.
* @param configuredSink The configured {@link TableSink} to register.
* @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
*/
@Deprecated
void registerTableSink(String name, TableSink<?> configuredSink);

/**
* Scans a registered table and returns the resulting {@link Table}.
*
Expand Down Expand Up @@ -924,7 +874,7 @@ default Table fromValues(DataType rowType, Object... values) {
* <pre>
* {@code
* // register the configured table sink into which the result is inserted.
* tEnv.registerTableSink("sinkTable", configuredSink);
* tEnv.registerTableSinkInternal("sinkTable", configuredSink);
* Table sourceTable = ...
* String tableName = sourceTable.toString();
* // sourceTable is not registered to the table environment
Expand Down
Loading

0 comments on commit 32b79d1

Please sign in to comment.