Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose manual stop method in KafkaReceiver #380

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: implement to stop KafkaReceiver
  • Loading branch information
cjlee38 committed Jan 17, 2024
commit b774616dcf3ed649e9af62ef4c023f829a971b22
8 changes: 8 additions & 0 deletions src/main/java/reactor/kafka/receiver/KafkaReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,12 @@ default Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager t
* @return Mono that completes with the value returned by <code>function</code>
*/
<T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function);

/**
* Explicitly shutdowns this {@link KafkaReceiver} and underlying Kafka {@link Consumer} asynchronously.
* Invoking this method initiates to stop fetching records from Kafka {@link Consumer} and emit completion signal downstream.
*
* @return Mono that stop current {@link KafkaReceiver}
*/
Mono<Void> stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ Mono<Void> stop() {

this.consumer.wakeup();
return Mono.<Void>fromRunnable(new CloseEvent(receiverOptions.closeTimeout()))
.as(flux -> flux.subscribeOn(eventScheduler));
.as(flux -> flux.subscribeOn(eventScheduler))
.doOnTerminate(() -> sink.emitComplete(ConsumerEventLoop.this));
})
.onErrorResume(e -> {
log.warn("Cancel exception: " + e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -48,6 +50,8 @@
*/
class ConsumerHandler<K, V> {

private static final Logger log = LoggerFactory.getLogger(ConsumerHandler.class);

/** Note: Methods added to this set should also be included in javadoc for {@link KafkaReceiver#doOnConsumer(Function)} */
private static final Set<String> DELEGATE_METHODS = new HashSet<>(Arrays.asList(
"assignment",
Expand Down Expand Up @@ -126,10 +130,15 @@ public Flux<ConsumerRecords<K, V>> receive() {
}

public Mono<Void> close() {
if (!consumerEventLoop.isActive.get()) {
return Mono.empty();
}

if (consumerListener != null) {
consumerListener.consumerRemoved(consumerId, consumer);
}
return consumerEventLoop.stop().doFinally(__ -> eventScheduler.dispose());

return consumerEventLoop.stop().doFinally(__ -> eventScheduler.disposeGracefully().subscribe());
}

public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ public <T> Mono<T> doOnConsumer(Function<org.apache.kafka.clients.consumer.Consu
return consumerHandler.doOnConsumer(function);
}

@Override
public Mono<Void> stop() {
ConsumerHandler<K, V> consumerHandler = consumerHandlerRef.get();
if (consumerHandler == null) {
log.debug("KafkaReceiver already stopped");
return Mono.empty();
}
return consumerHandler.close().doFinally(__ -> consumerHandlerRef.compareAndSet(consumerHandler, null));
}

private <T> Flux<T> withHandler(AckMode ackMode, BiFunction<Scheduler, ConsumerHandler<K, V>, Flux<T>> function) {
return Flux.usingWhen(
Mono.fromCallable(() -> {
Expand All @@ -190,7 +200,7 @@ private <T> Flux<T> withHandler(AckMode ackMode, BiFunction<Scheduler, ConsumerH
scheduler -> function.apply(scheduler, handler),
Scheduler::dispose
),
handler -> handler.close().doFinally(__ -> consumerHandlerRef.compareAndSet(handler, null))
handler -> handler.close().doFinally(__ -> consumerHandlerRef.set(null))
);
}

Expand Down
27 changes: 27 additions & 0 deletions src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,33 @@ public void addingReactorHookShouldNotBreakBackpressure() throws Exception {
.untilAsserted(() -> assertEquals(4, PassThroughCoreSubscriber.messagesOnNextCount()));
}

@Test
public void stop() throws Exception {
int receiveStartIndex = 0;
int receiveCount = 20;
AtomicBoolean shutdown = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(receiveCount);

KafkaReceiver<Integer, String> receiver = createReceiver();
Disposable disposable = subscribe(receiver.receive().delayElements(Duration.ofMillis(100)), latch);
sendMessages(receiveStartIndex, receiveCount);

Thread.sleep(1000L);
receiver.stop().subscribe(
__ -> {
/* do nothing */
},
e -> fail(e.getMessage()),
() -> shutdown.set(true)
);

assertFalse(shutdown.get());
waitForMessages(latch);
checkConsumedMessages(receiveStartIndex, receiveCount);
assertTrue(shutdown.get());
assertTrue(disposable.isDisposed());
}

private Disposable sendAndWaitForMessages(Flux<? extends ConsumerRecord<Integer, String>> kafkaFlux, int count) throws Exception {
CountDownLatch receiveLatch = new CountDownLatch(count);
Disposable disposable = subscribe(kafkaFlux, receiveLatch);
Expand Down