Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out of the box support for consuming requests within specified timestamps #296

Open
rishavjayswal opened this issue Oct 2, 2022 · 1 comment
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...)

Comments

@rishavjayswal
Copy link

rishavjayswal commented Oct 2, 2022

I think it would be pretty cool if we could have out of the box support to consume messages within a specified timestamp with reactor-kafka.
I have the following workaround for now:

ReceiverOptions<ByteBuffer, ByteBuffer> options =
        receiverOptions
            .subscription(topicConfig.getTopics())
            .pollTimeout(Duration.ofMillis(topicConfig.getPollWaitTimeoutMs()))
            .addAssignListener(
                partitions -> {
                  partitions.forEach(partition -> partition.seekToTimestamp(startTimeInMillis));
                });
KafkaReceiver.create(options)
        .receive()
        .takeWhile(record -> record.timestamp() < endTimeInMillis)
        .map(this::handleConsumerRecord);

But the solution poses the problem that when the above condition(.record -> record.timestamp() < endTimeInMillis) is met for a message, that message might be the last valid message in its partition, but it might be possible that other partitions in the topic still have messages below the end timestamp and would be left out.

Desired solution

An out of the box method that would let us consume messages till a certain predicate is true.
Alternatively, an option to access offset information for all the partitions before consuming records could also help as they can be stored in memory and be checked for each consumed record.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Oct 2, 2022
@garyrussell
Copy link
Contributor

You should be able to achieve your desired behavior by implementing a ConsumerInterceptor and adding it to ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG.

/**
 * A plugin interface that allows you to intercept (and possibly mutate) records received by the consumer. A primary use-case
 * is for third-party components to hook into the consumer applications for custom monitoring, logging, etc.
 *
 * <p>
 * This class will get consumer config properties via <code>configure()</code> method, including clientId assigned
 * by KafkaConsumer if not specified in the consumer config. The interceptor implementation needs to be aware that it will be
 * sharing consumer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
 * <p>
 * Exceptions thrown by ConsumerInterceptor methods will be caught, logged, but not propagated further. As a result, if
 * the user configures the interceptor with the wrong key and value type parameters, the consumer will not throw an exception,
 * just log the errors.
 * <p>
 * ConsumerInterceptor callbacks are called from the same thread that invokes
 * {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}.
 * <p>
 * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
 */
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {

You could iterate over the partitions and discard the unwanted records, and ending with a final one that won't pass your predicate.

@garyrussell garyrussell added for/user-attention This issue needs user attention (feedback, rework, etc...) and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Oct 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...)
Projects
None yet
Development

No branches or pull requests

3 participants