Skip to content

Commit

Permalink
[FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks.
Browse files Browse the repository at this point in the history
This closes apache#4990.
  • Loading branch information
fhueske committed Nov 16, 2017
1 parent 50fba9a commit c697bc1
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions docs/dev/table/sourceSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,58 @@ The following table lists the `TableSink`s which are provided with Flink.
| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table.
| `Kafka08JsonTableSink` | `flink-connector-kafka-0.8` | N | Append | A Kafka 0.8 sink with JSON encoding.
| `Kafka09JsonTableSink` | `flink-connector-kafka-0.9` | N | Append | A Kafka 0.9 sink with JSON encoding.
| `Kafka010JsonTableSink` | `flink-connector-kafka-0.10` | N | Append | A Kafka 0.10 sink with JSON encoding.

All sinks that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sinks, you have to add the respective dependency in addition to the `flink-table` dependency.

A custom `TableSink` can be defined by implementing the `BatchTableSink`, `AppendStreamTableSink`, `RetractStreamTableSink`, or `UpsertStreamTableSink` interface. See section on [defining a custom TableSink](#define-a-tablesink) for details.

{% top %}

### KafkaJsonTableSink

A `KafkaJsonTableSink` emits a [streaming append `Table`](./streaming.html#table-to-stream-conversion) to an Apache Kafka topic. The rows of the table are encoded as JSON records. Currently, only tables with flat schema, i.e., non-nested fields, are supported.

A `KafkaJsonTableSink` produces with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({{ site.baseurl }}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing).

By default, a `KafkaJsonTableSink` writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition). In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom `FlinkKafkaPartitioner` can be provided.

The following example shows how to create a `KafkaJsonTableSink` for Kafka 0.10. Sinks for Kafka 0.8 and 0.9 are instantiated analogously.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

Table table = ...

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");

table.writeToSink(
new Kafka010JsonTableSink(
"myTopic", // Kafka topic to write to
props)); // Properties to configure the producer

{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

val table: Table = ???

val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092")

table.writeToSink(
new Kafka010JsonTableSink(
"myTopic", // Kafka topic to write to
props)) // Properties to configure the producer

{% endhighlight %}
</div>
</div>

### CsvTableSink

The `CsvTableSink` emits a `Table` to one or more CSV files.
Expand Down

0 comments on commit c697bc1

Please sign in to comment.