title | nav-title | nav-parent_id | nav-pos |
---|---|---|---|
Upsert Kafka SQL Connector |
Upsert Kafka |
sql-connectors |
3 |
Scan Source: Unbounded Sink: Streaming Upsert Mode
- This will be replaced by the TOC {:toc}
The Upsert Kafka connector allows for reading data from and writing data into Kafka topics in the upsert fashion.
As a source, the upsert-kafka connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an UPDATE of the last value for the same key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”.
As a sink, the upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key). Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns, so the update/deletion messages on the same key will fall into the same partition.
{% assign connector = site.data.sql-connectors['upsert-kafka'] %} {% include sql-connector-download-table.html connector=connector %}
The example below shows how to create and use an Upsert Kafka table:
CREATE TABLE pageviews ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'pageviews', 'properties.bootstrap.servers' = '...', 'format' = 'json' );
-- calculate the pv, uv and insert into the upsert-kafka sink INSERT INTO pageviews_per_region SELECT region, COUNT(*), COUNT(DISTINCT user_id) FROM pageviews GROUP BY region;
{% endhighlight %}
See the [regular Kafka connector]({% link dev/connectors/kafka.md %}#available-metadata) for a list of all available metadata fields.
Option | Required | Default | Type | Description |
---|---|---|---|---|
required | (none) | String | Specify which connector to use, for the Upsert Kafka use: 'upsert-kafka' . |
|
required | (none) | String | The Kafka topic name to read from and write to. | |
required | (none) | String | Comma separated list of Kafka brokers. | |
optional | (none) | String |
This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false' . But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer' .
|
|
required | (none) | String | The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Attention Compared to the regular Kafka connector, the key fields are specified by thePRIMARY KEY syntax.
|
|
optional | (none) | String | Defines a custom prefix for all fields of the key format to avoid name clashes with fields
of the value format. By default, the prefix is empty. If a custom prefix is defined, both the
table schema and 'key.fields' will work with prefixed names. When constructing the
data type of the key format, the prefix will be removed and the non-prefixed names will be used
within the key format. Please note that this option requires that 'value.fields-include'
must be set to 'EXCEPT_KEY' .
|
|
required | (none) | String | The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. | |
optional | ALL | Enum Possible values: [ALL, EXCEPT_KEY] |
Defines a strategy how to deal with key columns in the data type of the value format. By
default, 'ALL' physical columns of the table schema will be included in the value
format which means that key columns appear in the data type for both the key and value format.
|
|
optional | (none) | Integer | Defines the parallelism of the upsert-kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
See the [regular Kafka connector]({% link dev/connectors/kafka.md %}#key-and-value-formats) for more
explanation around key and value formats. However, note that this connector requires both a key and
value format where the key fields are derived from the PRIMARY KEY
constraint.
The following example shows how to specify and configure key and value formats. The format options are
prefixed with either the 'key'
or 'value'
plus format identifier.
'key.format' = 'json', 'key.json.ignore-parse-errors' = 'true',
'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', 'value.fields-include' = 'EXCEPT_KEY' ) {% endhighlight %}
The Upsert Kafka always works in the upsert fashion and requires to define the primary key in the DDL. With the assumption that records with the same key should be ordered in the same partition, the primary key semantic on the changelog source means the materialized changelog is unique on the primary keys. The primary key definition will also control which fields should end up in Kafka’s key.
By default, an Upsert Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
This means, Flink may write duplicate records with the same key into the Kafka topic. But as the connector is working in the upsert mode, the last record on the same key will take effect when reading back as a source. Therefore, the upsert-kafka connector achieves idempotent writes just like the [HBase sink]({% link dev/table/connectors/hbase.md %}).
Flink supports to emit per-partition watermarks for Upsert Kafka. Watermarks are generated inside the Kafka
consumer. The per-partition watermarks are merged in the same way as watermarks are merged during streaming
shuffles. The output watermark of the source is determined by the minimum watermark among the partitions
it reads. If some partitions in the topics are idle, the watermark generator will not advance. You can
alleviate this problem by setting the ['table.exec.source.idle-timeout'
]({% link dev/table/config.md %}#table-exec-source-idle-timeout)
option in the table configuration.
Please refer to [Kafka watermark strategies]({% link dev/event_timestamps_watermarks.md %}#watermark-strategies-and-the-kafka-connector) for more details.
Upsert Kafka stores message keys and values as bytes, so Upsert Kafka doesn't have schema or data types. The messages are serialized and deserialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to [Formats]({% link dev/table/connectors/formats/index.md %}) pages for more details.
{% top %}