Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose manual stop method in KafkaReceiver #380

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/reactor/kafka/receiver/KafkaReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,12 @@ default Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager t
* @return Mono that completes with the value returned by <code>function</code>
*/
<T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function);

/**
* Explicitly shutdowns this {@link KafkaReceiver} and underlying Kafka {@link Consumer} asynchronously.
* Invoking this method initiates to stop fetching records from Kafka {@link Consumer} and emit completion signal downstream.
*
* @return Mono that stop current {@link KafkaReceiver}
*/
Mono<Void> stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ Mono<Void> stop() {

this.consumer.wakeup();
return Mono.<Void>fromRunnable(new CloseEvent(receiverOptions.closeTimeout()))
.as(flux -> flux.subscribeOn(eventScheduler));
.as(flux -> flux.subscribeOn(eventScheduler))
.doOnTerminate(() -> sink.emitComplete(ConsumerEventLoop.this));
})
.onErrorResume(e -> {
log.warn("Cancel exception: " + e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
* To be exposed as a public class in the next major version (a subject to the API review).
*/
class ConsumerHandler<K, V> {

/** Note: Methods added to this set should also be included in javadoc for {@link KafkaReceiver#doOnConsumer(Function)} */
private static final Set<String> DELEGATE_METHODS = new HashSet<>(Arrays.asList(
"assignment",
Expand Down Expand Up @@ -126,10 +125,15 @@ public Flux<ConsumerRecords<K, V>> receive() {
}

public Mono<Void> close() {
if (!consumerEventLoop.isActive.get()) {
return Mono.empty();
}

if (consumerListener != null) {
consumerListener.consumerRemoved(consumerId, consumer);
}
return consumerEventLoop.stop().doFinally(__ -> eventScheduler.dispose());

return consumerEventLoop.stop().doFinally(__ -> eventScheduler.disposeGracefully().subscribe());
}

public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ public <T> Mono<T> doOnConsumer(Function<org.apache.kafka.clients.consumer.Consu
return consumerHandler.doOnConsumer(function);
}

@Override
public Mono<Void> stop() {
ConsumerHandler<K, V> consumerHandler = consumerHandlerRef.get();
if (consumerHandler == null) {
log.debug("KafkaReceiver already stopped");
return Mono.empty();
}
return consumerHandler.close().doFinally(__ -> consumerHandlerRef.compareAndSet(consumerHandler, null));
}

private <T> Flux<T> withHandler(AckMode ackMode, BiFunction<Scheduler, ConsumerHandler<K, V>, Flux<T>> function) {
return Flux.usingWhen(
Mono.fromCallable(() -> {
Expand All @@ -190,7 +200,7 @@ private <T> Flux<T> withHandler(AckMode ackMode, BiFunction<Scheduler, ConsumerH
scheduler -> function.apply(scheduler, handler),
Scheduler::dispose
),
handler -> handler.close().doFinally(__ -> consumerHandlerRef.compareAndSet(handler, null))
handler -> handler.close().doFinally(__ -> consumerHandlerRef.set(null))
);
}

Expand Down
29 changes: 29 additions & 0 deletions src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,35 @@ public void addingReactorHookShouldNotBreakBackpressure() throws Exception {
.untilAsserted(() -> assertEquals(4, PassThroughCoreSubscriber.messagesOnNextCount()));
}

@Test
public void stop() throws Exception {
int receiveStartIndex = 0;
int receiveCount = 20;
AtomicBoolean shutdown = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(receiveCount);

KafkaReceiver<Integer, String> receiver = createReceiver();
Flux<ReceiverRecord<Integer, String>> flux = receiver.receive()
.delayElements(Duration.ofMillis(100L))
.flatMap(record -> Mono.delay(Duration.ofMillis(100)).then(Mono.just(record)));
Disposable disposable = subscribe(flux, latch);
sendMessages(receiveStartIndex, receiveCount);

Thread.sleep(1000L);
receiver.stop().subscribe(
ignored -> {
},
e -> fail(e.getMessage()),
() -> shutdown.set(true)
);

assertFalse(shutdown.get());
waitForMessages(latch);
checkConsumedMessages(receiveStartIndex, receiveCount);
assertTrue(shutdown.get());
assertTrue(disposable.isDisposed());
}

private Disposable sendAndWaitForMessages(Flux<? extends ConsumerRecord<Integer, String>> kafkaFlux, int count) throws Exception {
CountDownLatch receiveLatch = new CountDownLatch(count);
Disposable disposable = subscribe(kafkaFlux, receiveLatch);
Expand Down