Skip to content

Commit

Permalink
[hotfix] [docs] Improve TableSink documentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
fhueske committed Aug 11, 2017
1 parent 43e5a81 commit f78eb0f
Showing 1 changed file with 64 additions and 6 deletions.
70 changes: 64 additions & 6 deletions docs/dev/table/sourceSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ Have a look at the [common concepts and API](common.html) page for details how t
Provided TableSources
---------------------

**TODO: extend and complete**

Currently, Flink provides the `CsvTableSource` to read CSV files and a few table sources to read JSON or Avro data from Kafka.
A custom `TableSource` can be defined by implementing the `BatchTableSource` or `StreamTableSource` interface. See section on [defining a custom TableSource](#define-a-tablesource) for details.

Expand Down Expand Up @@ -202,9 +200,63 @@ val csvTableSource = CsvTableSource
Provided TableSinks
-------------------

### JDBCAppendSink
The following table lists the `TableSink`s which are provided with Flink.

| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
| `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files.
| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes tables to a JDBC database.
| `Kafka08JsonTableSink` | `flink-connector-kafka-0.8` | N | Append | A Kafka 0.8 sink with JSON encoding.
| `Kafka09JsonTableSink` | `flink-connector-kafka-0.9` | N | Append | A Kafka 0.9 sink with JSON encoding.

All sinks that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sinks, you have to add the respective dependency in addition to the `flink-table` dependency.

A custom `TableSink` can be defined by implementing the `BatchTableSink`, `AppendStreamTableSink`, `RetractStreamTableSink`, or `UpsertStreamTableSink` interface. See section on [defining a custom TableSink](#define-a-tablesink) for details.

{% top %}

### CsvTableSink

The `CsvTableSink` emits a `Table` to one or more CSV files.

The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming.html#table-to-stream-conversion) for details. When emitting a streaming table, rows are written at least once (if checkpointing is enabled) and the `CsvTableSink` does not split output files into bucket files but continuously writes to the same files.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

Table table = ...

table.writeToSink(
new CsvTableSink(
path, // output path
"|", // optional: delimit files by '|'
1, // optional: write to a single file
WriteMode.OVERWRITE)); // optional: override existing files

<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to implement upsert inside the database.
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

val table: Table = ???

table.writeToSink(
new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)) // optional: override existing files

{% endhighlight %}
</div>
</div>

### JDBCAppendTableSink

The `JDBCAppendTableSink` emits a `Table` to a JDBC connection. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming.html#table-to-stream-conversion) for details.

The `JDBCAppendTableSink` inserts each `Table` row at least once into the database table (if checkpointing is enabled). However, you can specify the insertion query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to perform upsert writes to the database.

To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>:

Expand All @@ -218,22 +270,28 @@ JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setQuery("INSERT INTO books (id) VALUES (?)")
.setParameterTypes(INT_TYPE_INFO)
.build();

Table table = ...
table.writeToSink(sink);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val sink = JDBCAppendTableSink.builder()
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("INSERT INTO books (id) VALUES (?)")
.setParameterTypes(INT_TYPE_INFO)
.build()

val table: Table = ???
table.writeToSink(sink)
{% endhighlight %}
</div>
</div>

Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table. You can connect the sink with other <code>DataStream</code>s once the sink is constructed.
Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table.

{% top %}

Expand Down

0 comments on commit f78eb0f

Please sign in to comment.