Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After pausing partition (and stopping it's processing), then seeking and resuming it my Flux is empty #319

Open
texhnolyzze opened this issue Jan 31, 2023 · 1 comment
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...)

Comments

@texhnolyzze
Copy link

texhnolyzze commented Jan 31, 2023

Expected Behavior

After I pause my partition at offset X, stop processing it, then some time latter seek to X and resume partition, I expect Flux to return records from this offset.

Actual Behavior

Nothing is returned

Steps to Reproduce

I'm trying to implement @RetryableTopic behavior, but with reactor-kafka.
Here is my abstract class (if you don't want to read it, I've included logs in the end), that tries to do that (sorry if this code looks bad to you)

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderResult;
import reactor.util.function.Tuple2;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

@Slf4j
public abstract class BaseReactiveKafkaConsumer<V> {

    @VisibleForTesting
    public static boolean LOG_EMPTY_MESSAGE_EXCEPTIONS = true;

    private static final Mono<?> EMPTY = Mono.just(new Object());

    private static final SenderResult<Void> NO_RETRY_SENDER_RESULT = new VoidSenderResult(null);

    private static final Mono<SenderResult<Void>> NO_RETRY = Mono.just(NO_RETRY_SENDER_RESULT);

    protected static final String NUM_RETRIES_HEADER = BaseReactiveKafkaConsumer.class.getName() + "." + "Retries";
    protected static final String NEXT_EXECUTION_HEADER = BaseReactiveKafkaConsumer.class.getName() + "." + "NextExecution";

    protected final ReactiveKafkaProducerTemplate<String, V> producerTemplate;

    protected final int maxRetries;

    protected final int maxRetriesSendRetry;
    protected final int retrySendRetryFixedDelaySeconds;

    private final Map<String, RetryTopicSequence> retryTopicSequenceMap;
    private final Map<TopicPartition, Boolean> pausedPartitions = new ConcurrentHashMap<>();
    private final ReactiveKafkaConsumerTemplate<String, V> consumerTemplate;

    protected BaseReactiveKafkaConsumer(
        final String bootstrapServers,
        final Collection<String> topics,
        final String groupId,
        @SuppressWarnings("rawtypes") final Class<? extends Serializer> serializer,
        @SuppressWarnings("rawtypes") final Class<? extends Deserializer> deserializer,
        final UnaryOperator<ReceiverOptions<String, V>> customizeConsumerOptions,
        final UnaryOperator<SenderOptions<String, V>> customizeSenderOptions,
        final int maxRetries,
        final int backoffInitialDelay,
        final int backoffMultiplier
    ) {
        this(
            bootstrapServers,
            3,
            10,
            topics,
            groupId,
            serializer,
            deserializer,
            customizeConsumerOptions,
            customizeSenderOptions,
            maxRetries,
            backoffInitialDelay,
            backoffMultiplier
        );
    }

    protected BaseReactiveKafkaConsumer(
        final String bootstrapServers,
        final int maxRetriesSendRetry,
        final int retrySendRetryFixedDelaySeconds,
        final Collection<String> topics,
        final String groupId,
        @SuppressWarnings("rawtypes") final Class<? extends Serializer> serializer,
        @SuppressWarnings("rawtypes") final Class<? extends Deserializer> deserializer,
        final UnaryOperator<ReceiverOptions<String, V>> customizeConsumerOptions,
        final UnaryOperator<SenderOptions<String, V>> customizeSenderOptions,
        final int maxRetries,
        final int backoffInitialDelay,
        final int backoffMultiplier
    ) {
        this.maxRetriesSendRetry = maxRetriesSendRetry;
        this.retrySendRetryFixedDelaySeconds = retrySendRetryFixedDelaySeconds;
        this.maxRetries = maxRetries;
        this.retryTopicSequenceMap = initTopics(topics, maxRetries, backoffInitialDelay, backoffMultiplier);
        this.producerTemplate = createProducerTemplate(bootstrapServers, serializer, customizeSenderOptions);
        this.consumerTemplate = createConsumerTemplate(
            bootstrapServers,
            retryTopicSequenceMap.keySet(),
            groupId,
            deserializer,
            customizeConsumerOptions
        );
    }

    @NotNull
    private Map<String, RetryTopicSequence> initTopics(
        final Collection<String> topics,
        final int retries,
        final int backoffInitialDelay,
        final int backoffMultiplier
    ) {
        final Map<String, RetryTopicSequence> mainTopics = topics.stream().collect(
            Collectors.toMap(
                Function.identity(),
                topic -> new RetryTopicSequence(
                    topic,
                    retries,
                    backoffInitialDelay,
                    backoffMultiplier
                )
            )
        );
        final Map<String, RetryTopicSequence> retryTopics = new HashMap<>();
        for (final RetryTopicSequence sequence : mainTopics.values()) {
            for (final String topic : sequence.topicNames) {
                retryTopics.put(topic, sequence);
            }
        }
        mainTopics.putAll(retryTopics);
        return mainTopics;
    }

    @EventListener(ApplicationStartedEvent.class)
    public void setup() {
        consumerTemplate
            .receive()
            .doOnError(
                ex -> log.error("Error receiving record, will retry", ex)
            )
            .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofMinutes(1L))) // connection problems
            .groupBy(rec -> rec.receiverOffset().topicPartition()) // group by partition for parallel processing
            .flatMap(
                groupedByPartition -> {
                    if (Boolean.TRUE.equals(pausedPartitions.get(groupedByPartition.key()))) {
                        log.debug("Stop processing partition {} because it's paused", groupedByPartition.key());
                        return Flux.empty();
                    }
                    return groupedByPartition
                               .publishOn(Schedulers.boundedElastic()) // don't block receiver thread
                               .concatMap( // process in order
                                   rec -> {
                                       if (log.isDebugEnabled()) {
                                           log.debug("Start processing {}", describe(rec));
                                       }
                                       return process(rec);
                                   }
                               )
                               .onErrorResume(
                                   ex -> {
                                       if (!(ex instanceof PartitionPausedException)) { // if not paused
                                           log.error("Exception while processing record", logException(ex)); // don't call describe(), as it maybe the cause of exception
                                       }
                                       return Flux.empty();
                                   }
                               );
                }
            )
            .subscribe();
    }

    private Mono<? extends Tuple2<?, SenderResult<Void>>> process(
        final ReceiverRecord<String, V> rec
    ) {
        //noinspection unchecked,rawtypes
        return Mono.just(rec)
                   .flatMap(this::pausePartitionIfNecessary)
                   .<V>handle(
                       (unused, sink) -> KafkaUtils.handleDeserializerException(rec, sink)
                   )
                   .flatMap(unused -> process0(rec)) // invoke user callback
                   .zipWhen(unused -> hookOnCompletionOfRecord(rec, NO_RETRY)) // wait for user callback completion, then invoke our hook
                   .doOnNext(
                       result -> {
                           if (log.isDebugEnabled()) {
                               log.debug("Processed {}. Result {}", describe(rec), result.getT1());
                           }
                       }
                   )
                   .doOnError(
                       ex -> {
                           if (!(ex instanceof PartitionPausedException)) { // no need pause exception
                               log.error(
                                   "Exception while processing {}",
                                   describe(rec),
                                   logException(ex)
                               );
                           }
                       }
                   )
                   .onErrorResume(
                       ex -> {
                           final Mono<? extends Tuple2<?, SenderResult<Void>>> result;
                           if (!(ex instanceof PartitionPausedException)) {
                               result = EMPTY.zipWith(retry(rec)); // retry failed record
                           } else {
                               result = Mono.error(ex); // propagate pause exception downstream to end processing of entire partition
                           }
                           //noinspection unchecked,rawtypes
                           return (Mono) result;
                       }
                   )
                   .switchIfEmpty(
                       Mono.defer(
                           () -> (Mono) EMPTY.zipWith(hookOnCompletionOfRecord(rec, NO_RETRY)) // deserialization exception
                       )
                   );
    }

    @NotNull
    private Mono<ReceiverRecord<String, V>> pausePartitionIfNecessary(final ReceiverRecord<String, V> rec) {
        if (maxRetries == 0) { // no retries (no pauses either)
            return Mono.just(rec);
        }
        final long now = System.currentTimeMillis();
        final long nextExecution = getNextExecution(rec, now);
        if (log.isDebugEnabled()) {
            log.debug(
                "Next execution is {} for {}",
                Instant.ofEpochMilli(nextExecution).atZone(ZoneId.systemDefault()),
                describe(rec)
            );
        }
        if (now < nextExecution) { // execution time has not come yet, pause partition
            final TopicPartition partition = rec.receiverOffset().topicPartition();
            pausedPartitions.compute(
                partition,
                (unused1, paused) -> {
                    if (!Boolean.TRUE.equals(paused)) { // if not paused already
                        log.info(
                            "Pausing partition {} for {} milliseconds. Offset {}. Pause triggered by {}",
                            partition,
                            nextExecution - now,
                            rec.offset(),
                            describe(rec)
                        );
                        consumerTemplate.pause(partition).doOnSuccess(
                            unused2 -> log.debug(
                                "Partition {} paused",
                                partition
                            )
                        ).subscribe();
                        Schedulers.parallel().schedule( // schedule partition resume on separate thread
                            () -> resumePartition(partition, rec.offset()),
                            nextExecution - now,
                            TimeUnit.MILLISECONDS
                        );
                    } else {
                        if (log.isDebugEnabled()) {
                            log.debug("Partition {} already paused. Record {}", partition, describe(rec));
                        }
                    }
                    return true;
                }
            );
            return Mono.error(new PartitionPausedException()); // propagate pause exception downstream
        } else { // execution time has come
            if (log.isDebugEnabled()) {
                log.debug("No need to pause {}", describe(rec));
            }
            return Mono.just(rec);
        }
    }

    private void resumePartition(
        final TopicPartition partition,
        final long offset
    ) {
        consumerTemplate.paused().any(partition::equals).doOnSuccess(
            actuallyPaused -> {
                if (Boolean.FALSE.equals(actuallyPaused)) {
                    log.error("Partition {} resuming while not actually paused", partition);
                }
            }
        ).subscribe();
        consumerTemplate
            .seek(partition, offset)
            .subscribe(); // seek partition to current record
        consumerTemplate
            .resume(partition)
            .doOnSuccess(
                unused -> {
                    log.info("Partition {} resumed", partition);
                    pausedPartitions.put(partition, false); // clear paused flag
                }
            )
            .subscribe();
    }

    protected long getNextExecution(final ReceiverRecord<String, V> rec, final long now) {
        final Header header = rec.headers().lastHeader(NEXT_EXECUTION_HEADER);
        final long nextExecution;
        if (header == null) {
            nextExecution = now;
        } else {
            nextExecution = Longs.fromByteArray(header.value());
        }
        return nextExecution;
    }

    @NotNull
    private Mono<SenderResult<Void>> retry(final ReceiverRecord<String, V> rec) {
        final int numRetries = getNumRetries(rec);
        if (log.isDebugEnabled()) {
            log.debug("Current number of retries is {} for {}", numRetries, describe(rec));
        }
        final Mono<SenderResult<Void>> result;
        if (numRetries >= maxRetries) {
            log.warn("Failed to process (will not retry again) {}", describe(rec));
            result = NO_RETRY;
        } else {
            final ProducerRecord<String, V> retryRecord = getRetryRecord(rec);
            result = sendRetry(rec, retryRecord);
        }
        return hookOnCompletionOfRecord(rec, result);
    }

    @NotNull
    @VisibleForTesting
    protected Mono<SenderResult<Void>> sendRetry(final ReceiverRecord<String, V> rec, final ProducerRecord<String, V> retryRecord) {
        final Mono<SenderResult<Void>> result = producerTemplate.send(retryRecord);
        return result.doOnSuccess(
            retrySenderResult -> {
                if (retrySenderResult.exception() == null) {
                    log.debug(
                        "Successfully sent to retry topic ({}-{}, offset {}) {}",
                        retrySenderResult.recordMetadata().topic(),
                        retrySenderResult.recordMetadata().partition(),
                        retrySenderResult.recordMetadata().offset(),
                        describe(rec)
                    );
                }
            }
        );
    }

    @NotNull
    private ProducerRecord<String, V> getRetryRecord(
        final ReceiverRecord<String, V> rec
    ) {
        final int numRetries = getNumRetries(rec);
        final RetryTopicSequence retrySequence = retryTopicSequenceMap.get(rec.topic());
        final String retryTopic = retrySequence.topicNames.get(numRetries);
        final long delay = retrySequence.delays.get(numRetries);
        return new ProducerRecord<>(
            retryTopic,
            rec.partition(),
            null,
            rec.key(),
            rec.value(),
            ImmutableList.of(
                new RecordHeader(NUM_RETRIES_HEADER, Ints.toByteArray(numRetries + 1)),
                new RecordHeader(NEXT_EXECUTION_HEADER, Longs.toByteArray(System.currentTimeMillis() + delay))
            )
        );
    }

    protected int getNumRetries(final ReceiverRecord<String, V> rec) {
        final Header numRetriesHeader = rec.headers().lastHeader(NUM_RETRIES_HEADER);
        final int numRetries;
        if (numRetriesHeader == null) {
            numRetries = 0;
        } else {
            numRetries = Ints.fromByteArray(numRetriesHeader.value());
        }
        return numRetries;
    }

    @NotNull
    private Mono<SenderResult<Void>> hookOnCompletionOfRecord(
        final ReceiverRecord<String, V> rec,
        final Mono<SenderResult<Void>> result
    ) {
        return result.doOnSuccess(
            sendResult -> {
                if (sendResult == NO_RETRY_SENDER_RESULT) {
                    rec.receiverOffset().acknowledge();
                    if (log.isDebugEnabled()) {
                        log.debug("No need to retry {}", describe(rec));
                    }
                } else if (sendResult.exception() != null) {
                    final ProducerRecord<String, V> retryRecord = getRetryRecord(rec);
                    log.error(
                        "Can't send to retry topic {} {}, scheduling periodic task that will try to do it ({} attempts max with {} seconds fixed delay)",
                        retryRecord.topic(),
                        describe(rec),
                        maxRetriesSendRetry,
                        retrySendRetryFixedDelaySeconds,
                        logException(sendResult.exception())
                    );
                    Mono.delay(Duration.ofSeconds(retrySendRetryFixedDelaySeconds))
                        .flatMap(
                            unused -> sendRetry(rec, retryRecord)
                        )
                        .flatMap(sendRetryResult -> sendRetryResult.exception() == null ?
                                                    Mono.just(sendRetryResult) :
                                                    Mono.error(sendRetryResult.exception()))
                        .doOnError(
                            ex -> log.error(
                                "Still can't send to retry topic {}",
                                describe(rec),
                                logException(ex)
                            )
                        )
                        .retryWhen(Retry.fixedDelay(maxRetriesSendRetry, Duration.ofSeconds(1)))
                        .doOnError(unused -> log.error("Record can't be send to retry topic, it will probably be lost {}", describe(rec)))
                        .subscribe();
                    rec.receiverOffset().acknowledge();
                }
            }
        );
    }

    protected abstract Mono<?> process0(final ReceiverRecord<String, V> rec);

    protected Object describe(final ReceiverRecord<String, V> rec) {
        return rec;
    }

    @NotNull
    private ReactiveKafkaProducerTemplate<String, V> createProducerTemplate(
        final String bootstrapServers,
        @SuppressWarnings("rawtypes") final Class<? extends Serializer> serializer,
        final UnaryOperator<SenderOptions<String, V>> customizeSenderOptions
    ) {
        final HashMap<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaHeaderRemover.class.getName());
        SenderOptions<String, V> options = SenderOptions.create(properties);
        if (customizeSenderOptions != null) {
            options = customizeSenderOptions.apply(options);
        }
        return new ReactiveKafkaProducerTemplate<>(options);
    }

    @NotNull
    private ReactiveKafkaConsumerTemplate<String, V> createConsumerTemplate(
        final String bootstrapServers,
        final Collection<String> topics,
        final String groupId,
        @SuppressWarnings("rawtypes") final Class<? extends Deserializer> deserializer,
        final UnaryOperator<ReceiverOptions<String, V>> customizeConsumerOptions
    ) {
        final HashMap<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, deserializer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        final Consumer<Collection<ReceiverPartition>> assignListener = partitions -> {
            Flux.fromIterable(partitions).
                map(ReceiverPartition::topicPartition).
                filter(pausedPartitions::containsKey).
                filter(pausedPartitions::get).
                flatMap(partition -> consumerTemplate.resume(partition).doOnSuccess(unused -> pausedPartitions.remove(partition))).
                subscribe();
        };
        final Consumer<Collection<ReceiverPartition>> revokeListener = partitions -> {
            for (final ReceiverPartition partition : partitions) {
                pausedPartitions.remove(partition.topicPartition());
            }
        };
        ReceiverOptions<String, V> options = ReceiverOptions.
                                                       <String, V>create(props).
                                                       addAssignListener(assignListener).
                                                       addRevokeListener(revokeListener).
                                                       subscription(topics);
        if (customizeConsumerOptions != null) {
            options = customizeConsumerOptions.apply(options);
        }
        return new ReactiveKafkaConsumerTemplate<>(options);
    }

    private Throwable logException(final Throwable ex) {
        return LOG_EMPTY_MESSAGE_EXCEPTIONS || !StringUtils.isBlank(ex.getMessage()) ? ex : null;
    }

    private static class RetryTopicSequence {

        private final List<Long> delays;
        private final List<String> topicNames;

        public RetryTopicSequence(
            final String topic,
            final int retries,
            final int backoffInitialDelay,
            final int backoffMultiplier
        ) {
            this.delays = new ArrayList<>(retries);
            this.topicNames = new ArrayList<>(retries);
            long delay = backoffInitialDelay;
            for (int i = 0; i < retries; i++) {
                delays.add(delay * 1000);
                topicNames.add(topic + "-retry-" + (i + 1));
                delay *= backoffMultiplier;
            }
        }

    }

    private static class PartitionPausedException extends RuntimeException {
    }

    @VisibleForTesting
    protected static class VoidSenderResult implements SenderResult<Void> {

        private final Exception exception;

        public VoidSenderResult(final Exception exception) {
            this.exception = exception;
        }

        @Override
        public RecordMetadata recordMetadata() {
            return null;
        }

        @Override
        public Exception exception() {
            return exception;
        }

        @Override
        public Void correlationMetadata() {
            return null;
        }

    }

}

Then I have test implementation of this class:

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.SenderResult;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@Component
public class ReactiveKafkaConsumerImpl extends BaseReactiveKafkaConsumer<String> {

    private static final VoidSenderResult ERROR_SENDER_RESULT = new VoidSenderResult(new RuntimeException());

    private final WebClient webClient = WebClient.create();

    private final Set<Integer> expectedToBeSent = new ConcurrentSkipListSet<>();
    private final Set<Integer> actuallySent = new ConcurrentSkipListSet<>();

    private boolean throwExceptions;

    private final Map<ReceiverRecord<String, String>, Integer> sendRetryFailedCount = new ConcurrentHashMap<>();

    protected ReactiveKafkaConsumerImpl() {
        super(
            BaseReactiveKafkaConsumerTest.KAFKA_BOOTSTRAP_SERVERS,
            3, // 3 attempts max to send retry
            1, // with 1-second fixed delay
            Collections.singletonList(BaseReactiveKafkaConsumerTest.TOPIC),
            "test-group",
            StringSerializer.class,
            StringDeserializer.class,
            null,
            null,
            2, // 2 retry topics
            1, // with 1 second delay
            1 // without backoff
        );
    }

    /**
     * some mock processing of kafka record
     */
    @Override
    protected Mono<?> process0(final ReceiverRecord<String, String> rec) {
        if (isThrowError(rec)) {
            throw new RuntimeException();
        }
        String uri = "https://jsonplaceholder.typicode.com/posts";
        return webClient.get().uri(uri).retrieve().toBodilessEntity().flatMap(
            response -> {
                if (isThrowError(rec)) {
                    return Mono.error(new RuntimeException());
                }
                return Mono.just(response);
            }
        ).doOnSuccess(
            unused -> {
                log.info("Sent {}", describe(rec));
                actuallySent.add(Integer.valueOf(rec.value()));
                if (actuallySent.containsAll(expectedToBeSent)) { //
                    synchronized (BaseReactiveKafkaConsumerTest.class) {
                        BaseReactiveKafkaConsumerTest.class.notifyAll(); // when we successfully processed all records wake up our test case
                    }
                }
                System.out.println(actuallySent);
            }
        );
    }

    public void setThrowExceptions(final boolean throwExceptions) {
        this.throwExceptions = throwExceptions;
    }

    public Set<Integer> getActuallySent() {
        return actuallySent;
    }

    public Set<Integer> getExpectedToBeSent() {
        return expectedToBeSent;
    }

    /**
     * if throwException=true then randomly returns whether we should fail with processing
     */
    private boolean isThrowError(final ReceiverRecord<String, String> rec) {
        return throwExceptions && ThreadLocalRandom.current().nextBoolean() && getNumRetries(rec) < maxRetries;
    }

    @Override
    protected Object describe(final ReceiverRecord<String, String> rec) {
        return "record " + rec.value() + " partition " + rec.receiverOffset().topicPartition() + " offset " + rec.offset();
    }

    /**
     * randomly emulates failing send retries to retry-topic
     */
    @Override
    protected @NotNull Mono<SenderResult<Void>> sendRetry(
        final ReceiverRecord<String, String> rec,
        final ProducerRecord<String, String> retryRecord
    ) {
        final AtomicBoolean fail = new AtomicBoolean();
        sendRetryFailedCount.compute(
            rec,
            (unused, count) -> {
                final int failedCount = count == null ? 0 : count;
                if (failedCount < maxRetriesSendRetry && ThreadLocalRandom.current().nextBoolean() && throwExceptions) {
                    fail.set(true);
                }
                return fail.get() ? failedCount + 1 : failedCount;
            }
        );
        if (fail.get()) {
            return Mono.just(ERROR_SENDER_RESULT);
        } else {
            return super.sendRetry(rec, retryRecord);
        }
    }

}

And also test itself:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderResult;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;

@EmbeddedKafka(
    partitions = BaseReactiveKafkaConsumerTest.PARTITIONS,
    brokerProperties = {
        "listeners=PLAINTEXT:https://" + BaseReactiveKafkaConsumerTest.KAFKA_HOST + ":" + BaseReactiveKafkaConsumerTest.KAFKA_PORT,
        "port=" + BaseReactiveKafkaConsumerTest.KAFKA_PORT
    }
)
@SpringBootTest(
    classes = BaseReactiveKafkaConsumerTestApplication.class,
    webEnvironment = SpringBootTest.WebEnvironment.NONE,
    properties = {
        "spring.cloud.consul.config.enabled=false",
        "logging.level.ru=debug",
        "logging.level.org.apache.kafka=debug"
    }
)
class BaseReactiveKafkaConsumerTest {

    public static final String KAFKA_PORT = "9092";
    public static final String KAFKA_HOST = "localhost";
    /**
     * for test repetition until stopped
     */
    static int iterations;
    static final int PARTITIONS = 5;
    static final int N = 100;
    static final String KAFKA_BOOTSTRAP_SERVERS = KAFKA_HOST + ":" + KAFKA_PORT;
    static final String TOPIC = "test-topic";

    @Autowired
    private ReactiveKafkaConsumerImpl reactiveKafkaConsumer;

    private ReactiveKafkaProducerTemplate<Object, Object> producerTemplate;

    @AfterAll
    static void afterAll() {
        iterations++;
    }

    @BeforeEach
    void setUp() {
        reactiveKafkaConsumer.getActuallySent().clear();
        reactiveKafkaConsumer.getExpectedToBeSent().clear();
        if (producerTemplate == null) {
            final HashMap<String, Object> properties = new HashMap<>();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            producerTemplate = new ReactiveKafkaProducerTemplate<>(
                SenderOptions.create(properties)
            );
        }
    }

    @Test
    void test() throws InterruptedException {
        BaseReactiveKafkaConsumer.LOG_EMPTY_MESSAGE_EXCEPTIONS = false; // keep log as small as possible
        reactiveKafkaConsumer.setThrowExceptions(true); // tell our mock consumer to fail sometimes
        final List<Mono<SenderResult<Void>>> monos = new ArrayList<>(N);
        for (int i = iterations * N; i < (iterations + 1) * N; i++) {
            monos.add(producerTemplate.send(TOPIC, i % PARTITIONS, null, i + "")); // produce records
            reactiveKafkaConsumer.getExpectedToBeSent().add(i);
        }
        Flux.fromIterable(monos).flatMap(Function.identity()).subscribe(); // send records
        synchronized (BaseReactiveKafkaConsumerTest.class) {
            BaseReactiveKafkaConsumerTest.class.wait(); // wait until we are waked up
        }
        Assertions.assertThat(
            reactiveKafkaConsumer.getActuallySent()
        ).containsAll(
            reactiveKafkaConsumer.getExpectedToBeSent()
        );
    }

}

Logs

In the logs I see that record 5 gets into test-topic-retry-1-0 with offset 0

2023-01-31 14:04:31.556 DEBUG 16996 --- [ad | producer-2] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Successfully sent to retry topic (test-topic-retry-1-0, offset 0) record 5 partition test-topic-0 offset 1

Then kafka Fetcher polls it:

2023-01-31 14:04:31.558 DEBUG 16996 --- [ad | test-group] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-test-group-1, groupId=test-group] Fetch READ_UNCOMMITTED at offset 0 for partition test-topic-retry-1-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=1, lastStableOffset=1, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=232, buffer=java.nio.HeapByteBuffer[pos=0 lim=232 cap=775]))

Then it starts processing:

2023-01-31 14:04:31.560 DEBUG 16996 --- [oundedElastic-6] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Start processing record 5 partition test-topic-retry-1-0 offset 0

Then pausing partition (because execution time has not come yet):

2023-01-31 14:04:31.562  INFO 16996 --- [oundedElastic-6] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Pausing partition test-topic-retry-1-0 for 983 milliseconds. Offset 0. Pause triggered by record 5 partition test-topic-retry-1-0 offset 0
2023-01-31 14:04:31.665 DEBUG 16996 --- [ka-test-group-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-test-group-1, groupId=test-group] Pausing partitions [test-topic-retry-1-0]
2023-01-31 14:04:31.665 DEBUG 16996 --- [ka-test-group-1] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Partition test-topic-retry-1-0 paused

Then partition is seeked to paused record and resumed:

2023-01-31 14:04:32.562  INFO 16996 --- [ka-test-group-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-test-group-1, groupId=test-group] Seeking to offset 0 for partition test-topic-retry-1-0
2023-01-31 14:04:32.562 DEBUG 16996 --- [ka-test-group-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-test-group-1, groupId=test-group] Resuming partitions [test-topic-retry-1-0]
2023-01-31 14:04:32.562  INFO 16996 --- [ka-test-group-1] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Partition test-topic-retry-1-0 resumed

Then kafka fetcher polls it again:

2023-01-31 14:04:32.871 DEBUG 16996 --- [ka-test-group-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-test-group-1, groupId=test-group] Fetch READ_UNCOMMITTED at offset 0 for partition test-topic-retry-1-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=1, lastStableOffset=1, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=232, buffer=java.nio.HeapByteBuffer[pos=0 lim=232 cap=1474]))

Then it starts processing, fails with error and gets into next retry-topic (till now everything is okay)

After some time record 20 gets into test-topic-retry-1-0 with offset 1:

2023-01-31 14:04:32.877 DEBUG 16996 --- [ad | producer-2] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Successfully sent to retry topic (test-topic-retry-1-0, offset 1) record 20 partition test-topic-0 offset 4

Fetcher polls it and it starts processing and pauses partition:

2023-01-31 14:04:32.878 DEBUG 16996 --- [ka-test-group-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-test-group-1, groupId=test-group] Fetch READ_UNCOMMITTED at offset 1 for partition test-topic-retry-1-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=2, lastStableOffset=2, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=233, buffer=java.nio.HeapByteBuffer[pos=0 lim=233 cap=236]))
2023-01-31 14:04:32.885 DEBUG 16996 --- [ad | producer-2] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Start processing record 20 partition test-topic-retry-1-0 offset 1
2023-01-31 14:04:32.885  INFO 16996 --- [ad | producer-2] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Pausing partition test-topic-retry-1-0 for 987 milliseconds. Offset 1. Pause triggered by record 20 partition test-topic-retry-1-0 offset 1
2023-01-31 14:04:32.888 DEBUG 16996 --- [ka-test-group-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-test-group-1, groupId=test-group] Pausing partitions [test-topic-retry-1-0]
2023-01-31 14:04:32.888 DEBUG 16996 --- [ka-test-group-1] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Partition test-topic-retry-1-0 paused

While this partition was paused, some more records gets into it:

2023-01-31 14:04:32.886 DEBUG 16996 --- [ad | producer-2] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Successfully sent to retry topic (test-topic-retry-1-0, offset 2) record 25 partition test-topic-0 offset 5
2023-01-31 14:04:33.172 DEBUG 16996 --- [ad | producer-2] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Successfully sent to retry topic (test-topic-retry-1-0, offset 3) record 30 partition test-topic-0 offset 6
2023-01-31 14:04:33.175 DEBUG 16996 --- [ad | producer-2] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Successfully sent to retry topic (test-topic-retry-1-0, offset 4) record 35 partition test-topic-0 offset 7
2023-01-31 14:04:33.253 DEBUG 16996 --- [ad | producer-2] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Successfully sent to retry topic (test-topic-retry-1-0, offset 5) record 50 partition test-topic-0 offset 10
2023-01-31 14:04:33.431 DEBUG 16996 --- [ad | producer-2] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Successfully sent to retry topic (test-topic-retry-1-0, offset 6) record 70 partition test-topic-0 offset 14
...
...
...
2023-01-31 14:04:33.878 DEBUG 16996 --- [ad | producer-2] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Successfully sent to retry topic (test-topic-retry-1-0, offset 9) record 15 partition test-topic-0 offset 3

Then partition is seeked to paused record (record 20 with offset 1) and resumed:

2023-01-31 14:04:33.900  INFO 16996 --- [ka-test-group-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-test-group-1, groupId=test-group] Seeking to offset 1 for partition test-topic-retry-1-0
2023-01-31 14:04:33.900 DEBUG 16996 --- [ka-test-group-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-test-group-1, groupId=test-group] Resuming partitions [test-topic-retry-1-0]
2023-01-31 14:04:33.900  INFO 16996 --- [ka-test-group-1] .r.e.e.m.c.s.k.BaseReactiveKafkaConsumer : Partition test-topic-retry-1-0 resumed

After that kafka fetcher polls 9 records I think (based on highWatermark/lastStableOffset):

2023-01-31 14:04:33.907 DEBUG 16996 --- [ka-test-group-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-test-group-1, groupId=test-group] Fetch READ_UNCOMMITTED at offset 1 for partition test-topic-retry-1-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, lastStableOffset=10, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=2097, buffer=java.nio.HeapByteBuffer[pos=0 lim=2097 cap=2100]))

And that's it, no processing is done after that, records gets into test-topic-retry-1-0, kafka fetcher polls them (based on such logs):

DEBUG 16996 --- [ka-test-group-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-test-group-1, groupId=test-group] Fetch READ_UNCOMMITTED at offset 10 for partition test-topic-retry-1-0 returned fetch data

But my processing is not happening (Flux is empty for that partition).

Sorry if this was a long read. Am I doing something wrong?
Many thanks.

Your Environment

  • Reactor version(s) used: 1.3.13
  • Kafka-clients: 3.1.2
  • JVM version (java -version): temurin-1.8.0_345
  • OS and version (eg uname -a): Windows 10
  • Kafka: kafka_2.13-3.3.1
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jan 31, 2023
@garyrussell
Copy link
Contributor

This is way too much code/complexity for reporting a problem.

I suggest you strip it down to a simple reproducer with a single topic and the bare minimum code, so we can more easily follow the logic and the problem you are seeing.

@garyrussell garyrussell added for/user-attention This issue needs user attention (feedback, rework, etc...) and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Feb 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...)
Projects
None yet
Development

No branches or pull requests

3 participants