Skip to content

Commit

Permalink
[FLINK-26126][test] migrate KafkaWriterITCase to AssertJ
Browse files Browse the repository at this point in the history
  • Loading branch information
JingGe authored and fapaul committed Mar 7, 2022
1 parent d07df57 commit 9725933
Showing 1 changed file with 40 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,7 @@
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic;
import static org.apache.flink.util.DockerImageVersions.KAFKA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for the standalone KafkaWriter. */
@ExtendWith(TestLoggerExtension.class)
Expand Down Expand Up @@ -126,7 +119,7 @@ public void setUp(TestInfo testInfo) {
public void testRegisterMetrics(DeliveryGuarantee guarantee) throws Exception {
try (final KafkaWriter<Integer> ignored =
createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) {
assertTrue(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent());
assertThat(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent()).isTrue();
}
}

Expand All @@ -150,15 +143,15 @@ public void testIncreasingRecordBasedCounters() throws Exception {
final Counter numBytesSend = metricGroup.getNumBytesSendCounter();
final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter();
final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter();
assertEquals(numBytesSend.getCount(), 0L);
assertEquals(numRecordsSend.getCount(), 0);
assertEquals(numRecordsWrittenErrors.getCount(), 0);
assertThat(numBytesSend.getCount()).isEqualTo(0L);
assertThat(numRecordsSend.getCount()).isEqualTo(0);
assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);

writer.write(1, SINK_WRITER_CONTEXT);
timeService.trigger();
assertEquals(numRecordsSend.getCount(), 1);
assertEquals(numRecordsWrittenErrors.getCount(), 0);
assertThat(numBytesSend.getCount(), greaterThan(0L));
assertThat(numRecordsSend.getCount()).isEqualTo(1);
assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
assertThat(numBytesSend.getCount()).isGreaterThan(0L);
}
}

Expand All @@ -173,8 +166,8 @@ public void testCurrentSendTimeMetric() throws Exception {
metricGroup)) {
final Optional<Gauge<Long>> currentSendTime =
metricListener.getGauge("currentSendTime");
assertTrue(currentSendTime.isPresent());
assertEquals(currentSendTime.get().getValue(), 0L);
assertThat(currentSendTime.isPresent()).isTrue();
assertThat(currentSendTime.get().getValue()).isEqualTo(0L);
IntStream.range(0, 100)
.forEach(
(run) -> {
Expand All @@ -188,7 +181,7 @@ public void testCurrentSendTimeMetric() throws Exception {
throw new RuntimeException("Failed writing Kafka record.");
}
});
assertThat(currentSendTime.get().getValue(), greaterThan(0L));
assertThat(currentSendTime.get().getValue()).isGreaterThan(0L);
}
}

Expand All @@ -202,12 +195,10 @@ void testNumRecordsOutErrorsCounterMetric() throws Exception {
createWriterWithConfiguration(
properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) {
final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
.isEqualTo(0L);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);

writer.write(1, SINK_WRITER_CONTEXT);
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
.isEqualTo(0L);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);

final String transactionalId = writer.getCurrentProducer().getTransactionalId();

Expand All @@ -223,8 +214,7 @@ void testNumRecordsOutErrorsCounterMetric() throws Exception {
writer.write(3, SINK_WRITER_CONTEXT);
writer.flush(false);
writer.prepareCommit();
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
.isEqualTo(1L);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
}
}

Expand All @@ -243,7 +233,7 @@ public void testMetadataPublisher() throws Exception {
expected.add("testMetadataPublisher-0@" + i);
}
writer.prepareCommit();
org.assertj.core.api.Assertions.assertThat(metadataList).isEqualTo(expected);
assertThat(metadataList).usingRecursiveComparison().isEqualTo(expected);
}
}

Expand All @@ -270,15 +260,15 @@ void testLingeringTransaction() throws Exception {
recoveredWriter.flush(false);
Collection<KafkaCommittable> committables = recoveredWriter.prepareCommit();
recoveredWriter.snapshotState(1);
assertThat(committables, hasSize(1));
assertThat(committables).hasSize(1);
final KafkaCommittable committable = committables.stream().findFirst().get();
assertThat(committable.getProducer().isPresent(), equalTo(true));
assertThat(committable.getProducer().isPresent()).isTrue();

committable.getProducer().get().getObject().commitTransaction();

List<ConsumerRecord<byte[], byte[]>> records =
drainAllRecordsFromTopic(topic, getKafkaClientConfiguration(), true);
assertThat(records, hasSize(1));
assertThat(records).hasSize(1);
}

failedWriter.close();
Expand All @@ -293,19 +283,18 @@ void testLingeringTransaction() throws Exception {
void useSameProducerForNonTransactional(DeliveryGuarantee guarantee) throws Exception {
try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) {
assertThat(writer.getProducerPool(), hasSize(0));
assertThat(writer.getProducerPool()).hasSize(0);

FlinkKafkaInternalProducer<byte[], byte[]> firstProducer = writer.getCurrentProducer();
writer.flush(false);
Collection<KafkaCommittable> committables = writer.prepareCommit();
writer.snapshotState(0);
assertThat(committables, hasSize(0));
assertThat(committables).hasSize(0);

assertThat(
"Expected same producer",
writer.getCurrentProducer(),
sameInstance(firstProducer));
assertThat(writer.getProducerPool(), hasSize(0));
assertThat(writer.getCurrentProducer() == firstProducer)
.as("Expected same producer")
.isTrue();
assertThat(writer.getProducerPool()).hasSize(0);
}
}

Expand All @@ -315,39 +304,37 @@ void usePoolForTransactional() throws Exception {
try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
assertThat(writer.getProducerPool(), hasSize(0));
assertThat(writer.getProducerPool()).hasSize(0);

writer.flush(false);
Collection<KafkaCommittable> committables0 = writer.prepareCommit();
writer.snapshotState(1);
assertThat(committables0, hasSize(1));
assertThat(committables0).hasSize(1);
final KafkaCommittable committable = committables0.stream().findFirst().get();
assertThat(committable.getProducer().isPresent(), equalTo(true));
assertThat(committable.getProducer().isPresent()).isTrue();

FlinkKafkaInternalProducer<?, ?> firstProducer =
committable.getProducer().get().getObject();
assertThat(
"Expected different producer",
firstProducer,
not(sameInstance(writer.getCurrentProducer())));
assertThat(firstProducer != writer.getCurrentProducer())
.as("Expected different producer")
.isTrue();

// recycle first producer, KafkaCommitter would commit it and then return it
assertThat(writer.getProducerPool(), hasSize(0));
assertThat(writer.getProducerPool()).hasSize(0);
firstProducer.commitTransaction();
committable.getProducer().get().close();
assertThat(writer.getProducerPool(), hasSize(1));
assertThat(writer.getProducerPool()).hasSize(1);

writer.flush(false);
Collection<KafkaCommittable> committables1 = writer.prepareCommit();
writer.snapshotState(2);
assertThat(committables1, hasSize(1));
assertThat(committables1).hasSize(1);
final KafkaCommittable committable1 = committables1.stream().findFirst().get();
assertThat(committable1.getProducer().isPresent(), equalTo(true));
assertThat(committable1.getProducer().isPresent()).isTrue();

assertThat(
"Expected recycled producer",
firstProducer,
sameInstance(writer.getCurrentProducer()));
assertThat(firstProducer == writer.getCurrentProducer())
.as("Expected recycled producer")
.isTrue();
}
}

Expand All @@ -361,7 +348,7 @@ void testAbortOnClose() throws Exception {
try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE)) {
writer.write(1, SINK_WRITER_CONTEXT);
assertThat(drainAllRecordsFromTopic(topic, properties, true), hasSize(0));
assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(0);
}

try (final KafkaWriter<Integer> writer =
Expand All @@ -372,7 +359,7 @@ void testAbortOnClose() throws Exception {
writer.snapshotState(1L);

// manually commit here, which would only succeed if the first transaction was aborted
assertThat(committables, hasSize(1));
assertThat(committables).hasSize(1);
final KafkaCommittable committable = committables.stream().findFirst().get();
String transactionalId = committable.getTransactionalId();
try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
Expand All @@ -381,7 +368,7 @@ void testAbortOnClose() throws Exception {
producer.commitTransaction();
}

assertThat(drainAllRecordsFromTopic(topic, properties, true), hasSize(1));
assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(1);
}
}

Expand Down

0 comments on commit 9725933

Please sign in to comment.