Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-321 / PR 325 - Observation propagation #368

Open
janchristian-haddorp opened this issue Oct 30, 2023 · 4 comments
Open

GH-321 / PR 325 - Observation propagation #368

janchristian-haddorp opened this issue Oct 30, 2023 · 4 comments
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@janchristian-haddorp
Copy link

Currently testing the enhancements added by PR #325. Great work!!

Is there a chance that observation context will be fully initialized on consumer side? I mean without manually calling API as described below?

When using HTTP calls tag/tap or Span.current() can be used on consumer side (Spring Boot 3.1.5). What is different when propagating observation via Kafka topic? Or how does Kafka limit observation propagation?

Because the reverse order nature of the Reactor context, the observation functionality on the KafkaReceiver is limited just to a single trace logging message for each received record.
Restored tracing information will be correlated into logs if so configured for the logging system.
If there are requirements to continue an observation on the consumer side, the KafkaReceiverObservation.RECEIVER_OBSERVATION API must be used manually in the record processing operator:

[source,java]
--------
KafkaReceiver.create(receiverOptions.subscription(List.of(topic)))
        .receive()
        .flatMap(record -> {
            Observation receiverObservation =
                KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
                        KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
                        () ->
                                new KafkaRecordReceiverContext(
                                    record, "user.receiver", receiverOptions.bootstrapServers()),
                            observationRegistry);
            return Mono.just(record)
                    .flatMap(TARGET_RECORD_HANDLER)
                    .doOnTerminate(receiverObservation::stop)
                    .doOnError(receiverObservation::error)
                    .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
        })
        .subscribe();
--------
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Oct 30, 2023
@patpatpat123
Copy link

upvoting this

@artembilan
Copy link
Contributor

When you mention Spring Boot and its WebFlux server, you don't compare apples with apples in this project.
The Reactor Kafka is just a library when you use an API for your own configuration in contradiction to an opinion provided for us in Spring Boot.
There you just declare your @PostMapping for a Mono method and WebFlux server takes care for its injection in the proper place together with all the useful infrastructure like security and/or observation.
With this library you are on your own with that KafkaReceiver.create() and it API for processing.
There is just no the point where end-user can inject his/her function for records processing and have an expected observation around.

At the same time we accept this ticket as a feature improvement since we also think that something like Mono<Void> receiveaAndProcess(Function<Mono<ConsumerRecord<K, V>>, Mono<Void>> userFunction) would do that trick hiding an observation API behind the scene.

@sinfull1
Copy link

sinfull1 commented Dec 7, 2023

upvoting

@maciej-gromul
Copy link

In general it seems it's impossible to make "seamless" tracing with flux publishers (whether it's reactor kafka or other flux source) since the context is bound with publisher as whole, when it's created and not with it's records.

Thus tracing for flux will always require significant code changes, since we need to change the logic from providing handler to Flux.flatMap to actually making a wrapper to Flux.flatMap that wraps a handler for Mono.flatMap that would have access to correct context as that mono would have special context designed for that 1 record on it's queue upon receiving that event.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

6 participants