Skip to content

Commit

Permalink
[FLINK-10156][table] Deprecate Table.writeToSink().
Browse files Browse the repository at this point in the history
This closes apache#6805.
  • Loading branch information
fhueske committed Oct 12, 2018
1 parent 383cf88 commit d1a03dd
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 80 deletions.
48 changes: 20 additions & 28 deletions docs/dev/table/common.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...); // or
tableEnv.registerExternalCatalog("extCat", ...);
// register an output Table
tableEnv.registerTableSink("outputTable", ...);

// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...);
tapiResult.insertInto("outputTable");

// execute
env.execute();
Expand All @@ -72,15 +74,17 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...) // or
tableEnv.registerExternalCatalog("extCat", ...)
tableEnv.registerExternalCatalog("extCat", ...)
// register an output Table
tableEnv.registerTableSink("outputTable", ...);

// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...)
tapiResult.insertInto("outputTable")

// execute
env.execute()
Expand Down Expand Up @@ -500,10 +504,7 @@ A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Ta

Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`.

There are two ways to emit a table:

1. The `Table.writeToSink(TableSink sink)` method emits the table using the provided `TableSink` and automatically configures the sink with the schema of the table to emit.
2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that was registered with a specific schema under the provided name in the `TableEnvironment`'s catalog. The schema of the table to emit is validated against the schema of the registered `TableSink`.
The `Table.insertInto(String tableName)` method emits the `Table` to a registered `TableSink`. The method looks up the `TableSink` from the catalog by the name and validates that the schema of the `Table` is identical to the schema of the `TableSink`.

The following examples shows how to emit a `Table`:

Expand All @@ -513,22 +514,17 @@ The following examples shows how to emit a `Table`:
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// compute a result Table using Table API operators and/or SQL queries
Table result = ...

// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");

// METHOD 1:
// Emit the result Table to the TableSink via the writeToSink() method
result.writeToSink(sink);

// METHOD 2:
// Register the TableSink with a specific schema
// register the TableSink with a specific schema
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
// Emit the result Table to the registered TableSink via the insertInto() method

// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable");

// execute the program
Expand All @@ -540,22 +536,18 @@ result.insertInto("CsvSinkTable");
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...

// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// METHOD 1:
// Emit the result Table to the TableSink via the writeToSink() method
result.writeToSink(sink)

// METHOD 2:
// Register the TableSink with a specific schema
// register the TableSink with a specific schema
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
// Emit the result Table to the registered TableSink via the insertInto() method

// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...

// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable")

// execute the program
Expand All @@ -576,7 +568,7 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de

A Table API or SQL query is translated when:

* a `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` or `Table.insertInto()` is called.
* a `Table` is emitted to a `TableSink`, i.e., when `Table.insertInto()` is called.
* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
* a `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).

Expand Down
68 changes: 54 additions & 14 deletions docs/dev/table/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -1047,30 +1047,42 @@ The sink only supports append-only streaming tables. It cannot be used to emit a
<div data-lang="java" markdown="1">
{% highlight java %}

Table table = ...

table.writeToSink(
new CsvTableSink(
CsvTableSink sink = new CsvTableSink(
path, // output path
"|", // optional: delimit files by '|'
1, // optional: write to a single file
WriteMode.OVERWRITE)); // optional: override existing files
WriteMode.OVERWRITE); // optional: override existing files

tableEnv.registerTableSink(
"csvOutputTable",
sink,
// specify table schema
new String[]{"f0", "f1"},
new TypeInformation[]{Types.STRING, Types.INT});

Table table = ...
table.insertInto("csvOutputTable");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

val table: Table = ???

table.writeToSink(
new CsvTableSink(
val sink: CsvTableSink = new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)) // optional: override existing files
writeMode = WriteMode.OVERWRITE) // optional: override existing files

tableEnv.registerTableSink(
"csvOutputTable",
sink,
// specify table schema
Array[String]("f0", "f1"),
Array[TypeInformation[_]](Types.STRING, Types.INT))

val table: Table = ???
table.insertInto("csvOutputTable")
{% endhighlight %}
</div>
</div>
Expand All @@ -1094,8 +1106,15 @@ JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setParameterTypes(INT_TYPE_INFO)
.build();

tableEnv.registerTableSink(
"jdbcOutputTable",
sink,
// specify table schema
new String[]{"id"},
new TypeInformation[]{Types.INT});

Table table = ...
table.writeToSink(sink);
table.insertInto("jdbcOutputTable");
{% endhighlight %}
</div>

Expand All @@ -1108,8 +1127,15 @@ val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setParameterTypes(INT_TYPE_INFO)
.build()

tableEnv.registerTableSink(
"jdbcOutputTable",
sink,
// specify table schema
Array[String]("id"),
Array[TypeInformation[_]](Types.INT))

val table: Table = ???
table.writeToSink(sink)
table.insertInto("jdbcOutputTable")
{% endhighlight %}
</div>
</div>
Expand Down Expand Up @@ -1137,8 +1163,15 @@ CassandraAppendTableSink sink = new CassandraAppendTableSink(
// the query must match the schema of the table
INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));

tableEnv.registerTableSink(
"cassandraOutputTable",
sink,
// specify table schema
new String[]{"id", "name", "value"},
new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE});

Table table = ...
table.writeToSink(sink);
table.insertInto(cassandraOutputTable);
{% endhighlight %}
</div>

Expand All @@ -1151,8 +1184,15 @@ val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
// the query must match the schema of the table
INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))

tableEnv.registerTableSink(
"cassandraOutputTable",
sink,
// specify table schema
Array[String]("id", "name", "value"),
Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE))

val table: Table = ???
table.writeToSink(sink)
table.insertInto(cassandraOutputTable)
{% endhighlight %}
</div>
</div>
Expand Down
18 changes: 16 additions & 2 deletions docs/dev/table/streaming/query_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,15 @@ Table result = ...
// create TableSink
TableSink<Row> sink = ...

// register TableSink
tableEnv.registerTableSink(
"outputTable", // table name
new String[]{...}, // field names
new TypeInformation[]{...}, // field types
sink); // table sink

// emit result Table via a TableSink
result.writeToSink(sink, qConfig);
result.insertInto("outputTable", qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
Expand All @@ -67,8 +74,15 @@ val result: Table = ???
// create TableSink
val sink: TableSink[Row] = ???

// register TableSink
tableEnv.registerTableSink(
"outputTable", // table name
Array[String](...), // field names
Array[TypeInformation[_]](...), // field types
sink) // table sink

// emit result Table via a TableSink
result.writeToSink(sink, qConfig)
result.insertInto("outputTable", qConfig)

// convert result Table into a DataStream[Row]
val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
Expand Down Expand Up @@ -456,9 +457,13 @@ public void testCassandraTableSink() throws Exception {
DataStreamSource<Row> source = env.fromCollection(rowCollection);

tEnv.registerDataStreamInternal("testFlinkTable", source);
tEnv.registerTableSink("cassandraTable",
new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY))
.configure(
new String[]{"f0", "f1", "f2"},
new TypeInformation[]{Types.STRING, Types.INT, Types.INT}));

tEnv.sql("select * from testFlinkTable").writeToSink(
new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY)));
tEnv.sqlQuery("select * from testFlinkTable").insertInto("cassandraTable");

env.execute();
ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,13 @@ class Table(
*
* @param sink The [[TableSink]] to which the [[Table]] is written.
* @tparam T The data type that the [[TableSink]] expects.
*
* @deprecated Will be removed in a future release. Please register the TableSink and use
* Table.insertInto().
*/
@deprecated("This method will be removed. Please register the TableSink and use " +
"Table.insertInto().", "1.7.0")
@Deprecated
def writeToSink[T](sink: TableSink[T]): Unit = {
val queryConfig = Option(this.tableEnv) match {
case None => null
Expand All @@ -912,7 +918,13 @@ class Table(
* @param sink The [[TableSink]] to which the [[Table]] is written.
* @param conf The configuration for the query that writes to the sink.
* @tparam T The data type that the [[TableSink]] expects.
*
* @deprecated Will be removed in a future release. Please register the TableSink and use
* Table.insertInto().
*/
@deprecated("This method will be removed. Please register the TableSink and use " +
"Table.insertInto().", "1.7.0")
@Deprecated
def writeToSink[T](sink: TableSink[T], conf: QueryConfig): Unit = {
// get schema information of table
val rowType = getRelNode.getRowType
Expand Down Expand Up @@ -944,7 +956,12 @@ class Table(
* @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written.
*/
def insertInto(tableName: String): Unit = {
tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
this.logicalPlan match {
case _: LogicalTableFunctionCall =>
throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.")
case _ =>
tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
}
}

/**
Expand All @@ -960,7 +977,12 @@ class Table(
* @param conf The [[QueryConfig]] to use.
*/
def insertInto(tableName: String, conf: QueryConfig): Unit = {
tableEnv.insertInto(this, tableName, conf)
this.logicalPlan match {
case _: LogicalTableFunctionCall =>
throw ValidationException("TableFunction can only be used in join and leftOuterJoin.")
case _ =>
tableEnv.insertInto(this, tableName, conf)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.flink.table.api.stream.table.validation

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.utils._
import org.apache.flink.table.runtime.stream.table.TestAppendSink
import org.apache.flink.table.utils.{ObjectTableFunction, TableFunc1, TableFunc2, TableTestBase}
import org.junit.Assert.{assertTrue, fail}
import org.junit.Test
Expand All @@ -47,6 +49,9 @@ class CorrelateValidationTest extends TableTestBase {

val func1 = new TableFunc1
util.javaTableEnv.registerFunction("func1", func1)
util.javaTableEnv.registerTableSink(
"testSink", new TestAppendSink().configure(
Array[String]("f"), Array[TypeInformation[_]](Types.INT)))

// table function call select
expectExceptionThrown(
Expand All @@ -60,10 +65,10 @@ class CorrelateValidationTest extends TableTestBase {
"TableFunction can only be used in join and leftOuterJoin."
)

// table function call writeToSink
// table function call insertInto
expectExceptionThrown(
func1('c).writeToSink(null),
"Cannot translate a query with an unbounded table function call."
func1('c).insertInto("testSink"),
"TableFunction can only be used in join and leftOuterJoin."
)

// table function call distinct
Expand Down
Loading

0 comments on commit d1a03dd

Please sign in to comment.