Skip to content

Commit

Permalink
MINOR: Reduce MM2 integration test flakiness due to missing dummy off…
Browse files Browse the repository at this point in the history
…set commits (apache#13838)


Reviewers: Mickael Maison <[email protected]>
  • Loading branch information
gharris1727 committed Jun 15, 2023
1 parent 930744c commit 505c7b6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -101,7 +103,7 @@ public class MirrorConnectorsIntegrationBaseTest {
private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000;
private static final int CHECKPOINT_INTERVAL_DURATION_MS = 1_000;
private static final int NUM_WORKERS = 3;
protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L);
protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(5000L);
protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
protected static final String BACKUP_CLUSTER_ALIAS = "backup";
protected static final List<Class<? extends Connector>> CONNECTOR_LIST = Arrays.asList(
Expand Down Expand Up @@ -219,7 +221,7 @@ public void startClusters(Map<String, String> additionalMM2Config) throws Except
waitForTopicCreated(backup, "mm2-configs.primary.internal");
waitForTopicCreated(backup, "test-topic-1");
waitForTopicCreated(primary, "test-topic-1");
warmUpConsumer(Collections.singletonMap("group.id", "consumer-group-dummy"));
prepareConsumerGroup(Collections.singletonMap("group.id", "consumer-group-dummy"));

log.info(PRIMARY_CLUSTER_ALIAS + " REST service: {}", primary.endpointForResource("connectors"));
log.info(BACKUP_CLUSTER_ALIAS + " REST service: {}", backup.endpointForResource("connectors"));
Expand Down Expand Up @@ -268,7 +270,7 @@ public void testReplication() throws Exception {
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
// warm up consumers before starting the connectors, so we don't need to wait for discovery
warmUpConsumer(consumerProps);
prepareConsumerGroup(consumerProps);

mm2Config = new MirrorMakerConfig(mm2Props);

Expand Down Expand Up @@ -537,7 +539,7 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception {

produceMessages(primary, "test-topic-1");

warmUpConsumer(consumerProps);
prepareConsumerGroup(consumerProps);

String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);

Expand Down Expand Up @@ -566,15 +568,15 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception {
}

@Test
public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedException {
public void testNoCheckpointsIfNoRecordsAreMirrored() throws Exception {
String consumerGroupName = "consumer-group-no-checkpoints";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);

// ensure there are some records in the topic on the source cluster
produceMessages(primary, "test-topic-1");

// warm up consumers before starting the connectors, so we don't need to wait for discovery
warmUpConsumer(consumerProps);
prepareConsumerGroup(consumerProps);

// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
Expand Down Expand Up @@ -631,11 +633,11 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio
}

@Test
public void testRestartReplication() throws InterruptedException {
public void testRestartReplication() throws Exception {
String consumerGroupName = "consumer-group-restart";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
warmUpConsumer(consumerProps);
prepareConsumerGroup(consumerProps);
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
Expand All @@ -658,11 +660,11 @@ public void testRestartReplication() throws InterruptedException {
}

@Test
public void testOffsetTranslationBehindReplicationFlow() throws InterruptedException {
public void testOffsetTranslationBehindReplicationFlow() throws Exception {
String consumerGroupName = "consumer-group-lagging-behind";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
warmUpConsumer(consumerProps);
prepareConsumerGroup(consumerProps);
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
Expand Down Expand Up @@ -1183,17 +1185,29 @@ private void createTopics() {
}
}

/*
* Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
/**
* Commit offset 0 for all partitions of test-topic-1 for the specified consumer groups on primary and backup clusters.
* <p>This is done to force the MirrorCheckpointConnector to start at a task which checkpoints this group.
* Must be called before {@link #waitUntilMirrorMakerIsRunning} to prevent that method from timing out.
*/
protected void warmUpConsumer(Map<String, Object> consumerProps) {
try (Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
dummyConsumer.commitSync();
}
try (Consumer<byte[], byte[]> dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
dummyConsumer.commitSync();
protected void prepareConsumerGroup(Map<String, Object> consumerProps) throws Exception {
prepareConsumerGroup(primary.kafka(), consumerProps, "test-topic-1");
prepareConsumerGroup(backup.kafka(), consumerProps, "test-topic-1");
}

private void prepareConsumerGroup(EmbeddedKafkaCluster cluster, Map<String, Object> consumerProps, String topic) throws Exception {
try (Admin client = cluster.createAdminClient()) {
Map<String, TopicDescription> topics = client.describeTopics(Collections.singleton(topic))
.allTopicNames()
.get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS);
Map<TopicPartition, OffsetAndMetadata> collect = topics.get(topic)
.partitions()
.stream()
.collect(Collectors.toMap(
tpi -> new TopicPartition(topic, tpi.partition()),
ignored -> new OffsetAndMetadata(0L)));
AlterConsumerGroupOffsetsResult alterResult = client.alterConsumerGroupOffsets((String) consumerProps.get("group.id"), collect);
alterResult.all().get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() throws
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
// warm up consumers before starting the connectors so we don't need to wait for discovery
warmUpConsumer(consumerProps);
prepareConsumerGroup(consumerProps);

mm2Config = new MirrorMakerConfig(mm2Props);

Expand Down Expand Up @@ -229,7 +229,7 @@ public void testCreatePartitionsUseProvidedForwardingAdmin() throws Exception {
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
// warm up consumers before starting the connectors so we don't need to wait for discovery
warmUpConsumer(consumerProps);
prepareConsumerGroup(consumerProps);

waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
waitUntilMirrorMakerIsRunning(primary, CONNECTOR_LIST, mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
Expand Down Expand Up @@ -262,7 +262,7 @@ public void testSyncTopicConfigUseProvidedForwardingAdmin() throws Exception {
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
// warm up consumers before starting the connectors so we don't need to wait for discovery
warmUpConsumer(consumerProps);
prepareConsumerGroup(consumerProps);

waitUntilMirrorMakerIsRunning(primary, CONNECTOR_LIST, mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
Expand Down

0 comments on commit 505c7b6

Please sign in to comment.