Skip to content

Commit

Permalink
GH-321: Add Observation propagation (#325)
Browse files Browse the repository at this point in the history
* GH-321: Add Observation propagation

Fixes #321

* Start version `1.4` since this requires some API changes and upgrades
* Upgrade to Reactor `3.5.3` for new context propagation API
* Copy-paste (and adjust) `KafkaReceiverObservation` & `KafkaSenderObservation`
infrastructure from Spring for Apache Kafka
* Add an `Observation` handling into a `KafkaSender` with `contextCapture()`
and `KafkaSenderObservation` around `producer.send()`
* There is no yet automatic `Observation` handling on the consumer side since it is not
clear how to handle each record transparently for end-user `Flux` downstream
* The `ReactorKafkaObservationTests` demonstrates a single trace propagation from a sender via
parent `Observation` and its restoration on a consumer side via `KafkaReceiverObservation`
in the end-user code

* * Fix Copyright date in the `build.gradle`

* * Remove redundant code from the `ReactorKafkaObservationTests`

* * Revert Reactor version
* Revert new version `1.4`
* Don't use `contextCapture()` in the `DefaultKafkaSender`:
better to ask end-users to use a `contextWrite()` for the parent observation to propagate
* `ReceiverObservations` factories to observe `ConsumerRecord` on the end-user side
via `transformDeferred()` on each record

* * Revert changes in the `DefaultKafkaSender`

* * Upgrade dependencies
* Fix deprecation in the `ConsumerHandler` for `eventScheduler.start()`
* Use generated `producerId` and `receiverId` if `CLIENT_ID_CONFIG` is not provided
* Use an `Observation` in the `DefaultKafkaReceiver` to add a `CONSUMER` span to
the trace provided by the `PRODUCER` in the record.
* Recommend to use a `KafkaReceiverObservation.RECEIVER_OBSERVATION` API directly
in the target end-user code when need to have an observation context around record handling

* * Revert `spotless` change in the `build.gradle`

* * Some code clean up
* Exclude `org.mockito` from Micrometer deps since we cannot use
a newer version of Mockito yet

* * Add trace log for received consumer record

* * Fix conflicts after rebase
* Add docs about observation support

* * Fix language in docs

Co-authored-by: Gary Russell <[email protected]>

---------

Co-authored-by: Gary Russell <[email protected]>
  • Loading branch information
artembilan and garyrussell authored Oct 26, 2023
1 parent bf33c9f commit 5436ae9
Show file tree
Hide file tree
Showing 21 changed files with 1,157 additions and 156 deletions.
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ configure(rootProject) {
archivesBaseName = 'reactor-kafka'
description = 'Reactor Kafka: A reactive API for Apache Kafka'

dependencies {
api libs.micrometer.observation

testImplementation (libs.micrometer.tracing.test) {
exclude group: 'org.mockito'
}
}

jar {
manifest {
attributes 'Automatic-Module-Name': 'reactor.kafka'
Expand Down
7 changes: 5 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ reactorCore = "3.4.33-SNAPSHOT"
asciidoctor = "3.3.2"
kafka-doc = "28"
log4j = "2.17.1"
micrometer = "1.10.4"
micrometer = "1.10.10"
powermock = "2.0.9"
reactiveStreams = "1.0.3"
micrometerTracing = '1.1.4'

[libraries]
kafka = "org.apache.kafka:kafka-clients:3.6.0"
Expand All @@ -28,7 +29,9 @@ powermock-core = { module = "org.powermock:powermock-core", version.ref = "power
powermock-junit = { module = "org.powermock:powermock-module-junit4", version.ref = "powermock" }
powermock-mockito = { module = "org.powermock:powermock-api-mockito2", version.ref = "powermock" }
slf4j = "org.slf4j:slf4j-api:1.7.36"
testcontainers = "org.testcontainers:kafka:1.16.3"
testcontainers = "org.testcontainers:kafka:1.19.0"
micrometer-observation = { module = "io.micrometer:micrometer-observation", version.ref = "micrometer" }
micrometer-tracing-test = { module = "io.micrometer:micrometer-tracing-integration-test", version.ref = "micrometerTracing" }

[plugins]
artifactory = { id = "com.jfrog.artifactory", version = "4.27.1" }
Expand Down
36 changes: 36 additions & 0 deletions src/docs/asciidoc/api-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -608,3 +608,39 @@ methods can be invoked after the receive Flux corresponding to the last receive

To enable micrometer metrics for the underlying Kafka Consumers and Producers, add a `MicrometerConsumerListener` to the `ReceiverOptions` or a `MicrometerProducerListener` to the `SenderOptions` respectively.

=== Micrometer Observation

To enable Micrometer observation for produced and consumed records, add an `ObservationRegistry` to the `SenderOptions` and `ReceiverOptions` using the `withObservation()` API.
A custom `KafkaSenderObservationConvention` (and `KafkaReceiverObservationConvention`) can also be set.
See their default implementations in the `KafkaSenderObservation` and `KafkaReceiverObservation`, respectively.
The `DefaultKafkaSenderObservationConvention` exposes two low-cardinality tags: `reactor.kafka.type = sender` and `reactor.kafka.client.id` with the `ProducerConfig.CLIENT_ID_CONFIG` option or identity hash code of the `DefaultKafkaSender` instance prefixed with the `reactor-kafka-sender-`.
The `DefaultKafkaReceiverObservationConvention` exposes two low-cardinality tags: `reactor.kafka.type = receiver` and `reactor.kafka.client.id` with the `ConsumerConfig.CLIENT_ID_CONFIG` option or identity hash code of the `DefaultKafkaReceiver` instance prefixed with the `reactor-kafka-receiver-`.

If a `PropagatingSenderTracingObservationHandler` is configured on the `ObservationRegistry`, the tracing information from the context around a producer record is stored into its headers before publishing this record to the Kafka topic.
If a `PropagatingReceiverTracingObservationHandler` is configured on the `ObservationRegistry`, the tracing information from the mentioned Kafka record headers, is restored into the context on the receiver side with a child span.

Because the reverse order nature of the Reactor context, the observation functionality on the `KafkaReceiver` is limited just to a single `trace` logging message for each received record.
Restored tracing information will be correlated into logs if so configured for the logging system.
If there are requirements to continue an observation on the consumer side, the `KafkaReceiverObservation.RECEIVER_OBSERVATION` API must be used manually in the record processing operator:

[source,java]
--------
KafkaReceiver.create(receiverOptions.subscription(List.of(topic)))
.receive()
.flatMap(record -> {
Observation receiverObservation =
KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() ->
new KafkaRecordReceiverContext(
record, "user.receiver", receiverOptions.bootstrapServers()),
observationRegistry);
return Mono.just(record)
.flatMap(TARGET_RECORD_HANDLER)
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
})
.subscribe();
--------
Loading

0 comments on commit 5436ae9

Please sign in to comment.