Skip to content

Commit

Permalink
[FLINK-26126][kafka] develop record out error counter metric
Browse files Browse the repository at this point in the history
  • Loading branch information
JingGe authored and fapaul committed Mar 7, 2022
1 parent 339cd3c commit d07df57
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class KafkaWriter<IN>
private final boolean disabledMetrics;
private final Counter numRecordsSendCounter;
private final Counter numBytesSendCounter;
private final Counter numRecordsOutErrorsCounter;
private final ProcessingTimeService timeService;

// Number of outgoing bytes at the latest metric sync
Expand Down Expand Up @@ -154,6 +155,7 @@ class KafkaWriter<IN>
this.metricGroup = sinkInitContext.metricGroup();
this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
this.kafkaSinkContext =
new DefaultKafkaSinkContext(
sinkInitContext.getSubtaskId(),
Expand Down Expand Up @@ -410,7 +412,10 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
FlinkKafkaInternalProducer<byte[], byte[]> producer =
KafkaWriter.this.currentProducer;
mailboxExecutor.execute(
() -> throwException(metadata, exception, producer),
() -> {
numRecordsOutErrorsCounter.inc();
throwException(metadata, exception, producer);
},
"Failed to send data to Kafka");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,42 @@ public void testCurrentSendTimeMetric() throws Exception {
}
}

@Test
void testNumRecordsOutErrorsCounterMetric() throws Exception {
Properties properties = getKafkaClientConfiguration();
final InternalSinkWriterMetricGroup metricGroup =
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());

try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(
properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) {
final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
.isEqualTo(0L);

writer.write(1, SINK_WRITER_CONTEXT);
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
.isEqualTo(0L);

final String transactionalId = writer.getCurrentProducer().getTransactionalId();

try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
new FlinkKafkaInternalProducer<>(properties, transactionalId)) {

producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<byte[], byte[]>(topic, "2".getBytes()));
producer.commitTransaction();
}

writer.write(3, SINK_WRITER_CONTEXT);
writer.flush(false);
writer.prepareCommit();
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
.isEqualTo(1L);
}
}

@Test
public void testMetadataPublisher() throws Exception {
List<String> metadataList = new ArrayList<>();
Expand Down

0 comments on commit d07df57

Please sign in to comment.