-
Notifications
You must be signed in to change notification settings - Fork 225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Out of the box support for consuming requests within specified timestamps #296
Labels
for/user-attention
This issue needs user attention (feedback, rework, etc...)
Comments
reactorbot
added
the
❓need-triage
This issue needs triage, hasn't been looked at by a team member yet
label
Oct 2, 2022
You should be able to achieve your desired behavior by implementing a /**
* A plugin interface that allows you to intercept (and possibly mutate) records received by the consumer. A primary use-case
* is for third-party components to hook into the consumer applications for custom monitoring, logging, etc.
*
* <p>
* This class will get consumer config properties via <code>configure()</code> method, including clientId assigned
* by KafkaConsumer if not specified in the consumer config. The interceptor implementation needs to be aware that it will be
* sharing consumer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
* <p>
* Exceptions thrown by ConsumerInterceptor methods will be caught, logged, but not propagated further. As a result, if
* the user configures the interceptor with the wrong key and value type parameters, the consumer will not throw an exception,
* just log the errors.
* <p>
* ConsumerInterceptor callbacks are called from the same thread that invokes
* {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}.
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
*/
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable { You could iterate over the partitions and discard the unwanted records, and ending with a final one that won't pass your predicate. |
garyrussell
added
for/user-attention
This issue needs user attention (feedback, rework, etc...)
and removed
❓need-triage
This issue needs triage, hasn't been looked at by a team member yet
labels
Oct 3, 2022
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I think it would be pretty cool if we could have out of the box support to consume messages within a specified timestamp with reactor-kafka.
I have the following workaround for now:
But the solution poses the problem that when the above condition(
.record -> record.timestamp() < endTimeInMillis
) is met for a message, that message might be the last valid message in its partition, but it might be possible that other partitions in the topic still have messages below the end timestamp and would be left out.Desired solution
An out of the box method that would let us consume messages till a certain predicate is true.
Alternatively, an option to access offset information for all the partitions before consuming records could also help as they can be stored in memory and be checked for each consumed record.
The text was updated successfully, but these errors were encountered: