Skip to content

Commit

Permalink
feature: Retry count available in header
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Feb 15, 2022
1 parent 84cf11a commit d8b25a3
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.state.WorkContainer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class ParallelConsumerRecord {

public static int getFailedCount(final ConsumerRecord<String, String> record) {
Headers headers = record.headers();
Header header = headers.lastHeader(WorkContainer.FAILED_COUNT_HEADER);
if (header != null) {
byte[] value = header.value();
ByteBuffer wrap = ByteBuffer.wrap(value);
return wrap.getInt();
} else {
return 0;
}
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.utils.WallClock;
Expand All @@ -13,7 +13,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.Temporal;
Expand All @@ -28,6 +31,7 @@
@EqualsAndHashCode
public class WorkContainer<K, V> implements Comparable<WorkContainer> {

public static final String FAILED_COUNT_HEADER = "X-Confluent-retry-count";
private final String DEFAULT_TYPE = "DEFAULT";

/**
Expand Down Expand Up @@ -89,11 +93,23 @@ public WorkContainer(int epoch, ConsumerRecord<K, V> cr, String workType) {

public void fail(WallClock clock) {
log.trace("Failing {}", this);
numberOfFailedAttempts++;
updateFailedCount();
failedAt = Optional.of(clock.getNow());
inFlight = false;
}

private void updateFailedCount() {
numberOfFailedAttempts++;

byte[] encodedRetries = ByteBuffer.allocate(Integer.BYTES)
.putInt(getNumberOfFailedAttempts())
.array();
Headers headers = this.getCr().headers();
// replace
headers.remove(FAILED_COUNT_HEADER);
headers.add(new RecordHeader(FAILED_COUNT_HEADER, encodedRetries));
}

public void succeed() {
log.trace("Succeeded {}", this);
inFlight = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.truth.Truth.assertThat;

class FailedCountTest extends ParallelEoSStreamProcessorTestBase {

@Test
void failedCountTest() {
primeFirstRecord();

AtomicInteger iteration = new AtomicInteger();
AtomicReference<ConsumerRecord> lastRec = new AtomicReference<>();
parallelConsumer.poll(consumerRecord -> {
iteration.incrementAndGet();

lastRec.set(consumerRecord);
int failedCount = ParallelConsumerRecord.getFailedCount(consumerRecord);

if (iteration.get() == 1) {
assertThat(failedCount).isEqualTo(0);
throw new RuntimeException("fail 1");
} else if (iteration.get() == 2) {
assertThat(failedCount).isEqualTo(1);
}
});
waitForOneLoopCycle();
parallelConsumer.closeDrainFirst();

ConsumerRecord consumerRecord = lastRec.get();
assertThat(consumerRecord).isNotNull();
int failedCount = ParallelConsumerRecord.getFailedCount(consumerRecord);
assertThat(failedCount).isEqualTo(1);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package io.confluent.parallelconsumer.examples.core;

/*-
* Copyright (C) 2020 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerRecord;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -91,7 +92,8 @@ void runPollAndProduce() {

// tag::exampleProduce[]
parallelConsumer.pollAndProduce(record -> {
var result = processBrokerRecord(record);
int failedCount = ParallelConsumerRecord.getFailedCount(record);
var result = processBrokerRecord(record);
return new ProducerRecord<>(outputTopic, record.key(), result.payload);
}, consumeProduceResult -> {
log.debug("Message {} saved to broker at offset {}",
Expand Down

0 comments on commit d8b25a3

Please sign in to comment.