Skip to content

Commit

Permalink
[FLINK-19152] Remove Kafka 0.10.x and 0.11.x connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Sep 15, 2020
1 parent e594cf5 commit 3df63de
Show file tree
Hide file tree
Showing 101 changed files with 79 additions and 13,072 deletions.
105 changes: 28 additions & 77 deletions docs/dev/connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@ Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading

## Dependency

Apache Flink ships with multiple Kafka connectors: universal, 0.10, and 0.11.
This universal Kafka connector attempts to track the latest version of the Kafka client.
Apache Flink ships with a universal Kafka connector which attempts to track the latest version of the Kafka client.
The version of the client it uses may change between Flink releases.
Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
For most users the universal Kafka connector is the most appropriate.
However, for Kafka versions 0.11.x and 0.10.x, we recommend using the dedicated ``0.11`` and ``0.10`` connectors, respectively.
For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).

<div class="codetabs" markdown="1">
Expand All @@ -48,34 +45,14 @@ For details on Kafka compatibility, please refer to the official [Kafka document
</dependency>
{% endhighlight %}
</div>
<div data-lang="011" markdown="1">
{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-011{{ site.scala_version_suffix }}</artifactId>
<version>{{ site.version }}</version>
</dependency>
{% endhighlight %}
</div>
<div data-lang="010" markdown="1">
{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-010{{ site.scala_version_suffix }}</artifactId>
<version>{{ site.version }}</version>
</dependency>
{% endhighlight %}
<span class="label label-danger">Attention</span> The ``0.10`` sink does not support exactly-once writes to Kafka.
</div>
</div>

Flink's streaming connectors are not currently part of the binary distribution.
See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/project-configuration.html).

## Kafka Consumer

Flink's Kafka consumer - `FlinkKafkaConsumer` (or `FlinkKafkaConsumer011` for Kafka 0.11.x,
or `FlinkKafkaConsumer010` for Kafka 0.10.x) - provides access to read from one or more Kafka topics.
Flink's Kafka consumer - `FlinkKafkaConsumer` provides access to read from one or more Kafka topics.

The constructor accepts the following arguments:

Expand Down Expand Up @@ -405,8 +382,7 @@ Consider setting appropriate [idelness timeouts]({{ site.baseurl }}/dev/event_ti

## Kafka Producer

Flink’s Kafka Producer - `FlinkKafkaProducer` (or `FlinkKafkaProducer010` for Kafka 0.10.x versions or `FlinkKafkaProducer011` for Kafka 0.11.x versions) -
allows writing a stream of records to one or more Kafka topics.
Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of records to one or more Kafka topics.

The constructor accepts the following arguments:

Expand Down Expand Up @@ -465,13 +441,11 @@ Through the producer record you can:

### Kafka Producers and Fault Tolerance

<div class="codetabs" markdown="1">
<div data-lang="Universal and 011" markdown="1">
With Flink's checkpointing enabled, the `FlinkKafkaProducer011` (`FlinkKafkaProducer` for Kafka >= 1.0.0 versions) can provide
With Flink's checkpointing enabled, the `FlinkKafkaProducer` can provide
exactly-once delivery guarantees.

Besides enabling Flink's checkpointing, you can also choose three different modes of operating
chosen by passing appropriate `semantic` parameter to the `FlinkKafkaProducer011` (`FlinkKafkaProducer` for Kafka >= 1.0.0 versions):
chosen by passing appropriate `semantic` parameter to the `FlinkKafkaProducer`:

* `Semantic.NONE`: Flink will not guarantee anything. Produced records can be lost or they can
be duplicated.
Expand All @@ -492,7 +466,7 @@ times.

Kafka brokers by default have `transaction.max.timeout.ms` set to 15 minutes. This property will
not allow to set transaction timeouts for the producers larger than it's value.
`FlinkKafkaProducer011` by default sets the `transaction.timeout.ms` property in producer config to
`FlinkKafkaProducer` by default sets the `transaction.timeout.ms` property in producer config to
1 hour, thus `transaction.max.timeout.ms` should be increased before using the
`Semantic.EXACTLY_ONCE` mode.

Expand All @@ -515,50 +489,16 @@ the consumers until `transaction1` is committed or aborted. This has two implica
agents/applications writing to the same Kafka topic.

**Note**: `Semantic.EXACTLY_ONCE` mode uses a fixed size pool of KafkaProducers
per each `FlinkKafkaProducer011` instance. One of each of those producers is used per one
checkpoint. If the number of concurrent checkpoints exceeds the pool size, `FlinkKafkaProducer011`
per each `FlinkKafkaProducer` instance. One of each of those producers is used per one
checkpoint. If the number of concurrent checkpoints exceeds the pool size, `FlinkKafkaProducer`
will throw an exception and will fail the whole application. Please configure max pool size and max
number of concurrent checkpoints accordingly.

**Note**: `Semantic.EXACTLY_ONCE` takes all possible measures to not leave any lingering transactions
that would block the consumers from reading from Kafka topic more then it is necessary. However in the
event of failure of Flink application before first checkpoint, after restarting such application there
is no information in the system about previous pool sizes. Thus it is unsafe to scale down Flink
application before first checkpoint completes, by factor larger than `FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR`.

</div>
<div data-lang="010" markdown="1">
With Flink's checkpointing enabled, the `FlinkKafkaProducer010`
can provide at-least-once delivery guarantees.

Besides enabling Flink's checkpointing, you should also configure the setter
methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately.

* `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
Enabling this will let the producer only log failures
instead of catching and rethrowing them. This essentially accounts the record
to have succeeded, even if it was never written to the target Kafka topic. This
must be disabled for at-least-once.
* `setFlushOnCheckpoint(boolean)`: by default, this is set to `true`.
With this enabled, Flink's checkpoints will wait for any
on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
succeeding the checkpoint. This ensures that all records before the checkpoint have
been written to Kafka. This must be enabled for at-least-once.

In conclusion, the Kafka producer by default has at-least-once guarantees for versions
0.10, with `setLogFailureOnly` set to `false` and `setFlushOnCheckpoint` set
to `true`.

**Note**: By default, the number of retries is set to "0". This means that when `setLogFailuresOnly` is set to `false`,
the producer fails immediately on errors, including leader changes. The value is set to "0" by default to avoid
duplicate messages in the target topic that are caused by retries. For most production environments with frequent broker changes,
we recommend setting the number of retries to a higher value.

**Note**: There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
into a Kafka topic.

</div>
</div>
application before first checkpoint completes, by factor larger than `FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR`.

## Kafka Connector Metrics

Expand Down Expand Up @@ -606,14 +546,25 @@ A mismatch in service name between client and server configuration will cause th
For more information on Flink configuration for Kerberos security, please see [here]({{ site.baseurl}}/ops/config.html).
You can also find [here]({{ site.baseurl}}/ops/security-kerberos.html) further details on how Flink internally setups Kerberos-based security.

## Migrating Kafka Connector from 0.11 to universal

In order to perform the migration, see the [upgrading jobs and Flink versions guide]({{ site.baseurl }}/ops/upgrading.html)
and:
* Use Flink 1.9 or newer for the whole process.
* Do not upgrade Flink and user operators at the same time.
* Make sure that Kafka Consumer and/or Kafka Producer used in your job have assigned unique identifiers (`uid`):
* Use stop with savepoint feature to take the savepoint (for example by using `stop --withSavepoint`)[CLI command]({{ site.baseurl }}/ops/cli.html).
## Upgrading to the Latest Connector Version

The generic upgrade steps are outlined in [upgrading jobs and Flink versions
guide]({{ site.baseurl }}/ops/upgrading.html). For Kafka, you additionally need
to follow these steps:

* Do not upgrade Flink and the Kafka Connector version at the same time.
* Make sure you have a `group.id` configured for your Consumer.
* Set `setCommitOffsetsOnCheckpoints(true)` on the consumer so that read
offsets are committed to Kafka. It's important to do this before stopping and
taking the savepoint. You might have to do a stop/restart cycle on the old
connector version to enable this setting.
* Set `setStartFromGroupOffsets(true)` on the consumer so that we get read
offsets from Kafka. This will only take effect when there is no read offset
in Flink state, which is why the next step is very important.
* Change the assigned `uid` of your source/sink. This makes sure the new
source/sink doesn't read state from the old source/sink operators.
* Start the new job with `--allow-non-restored-state` because we still have the
state of the previous connector version in the savepoint.

## Troubleshooting

Expand Down
Loading

0 comments on commit 3df63de

Please sign in to comment.