Skip to content

Commit

Permalink
[FLINK-29914][tests] Wait for Kafka topic creation/deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi authored and gyfora committed Nov 10, 2022
1 parent fa6d188 commit c66ef08
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -429,13 +428,13 @@ private void createTestTopic(String topic, int numPartitions, short replicationF
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, replicationFactor)));
result.all().get(1, TimeUnit.MINUTES);
result.all().get();
}

private void deleteTestTopic(String topic)
throws ExecutionException, InterruptedException, TimeoutException {
final DeleteTopicsResult result = admin.deleteTopics(Collections.singletonList(topic));
result.all().get(1, TimeUnit.MINUTES);
result.all().get();
}

private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;

Expand All @@ -67,7 +66,6 @@ public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext
private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkExternalContext.class);

private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
private static final long DEFAULT_TIMEOUT = 30L;
private static final int RANDOM_STRING_MAX_LENGTH = 50;
private static final int NUM_RECORDS_UPPER_BOUND = 500;
private static final int NUM_RECORDS_LOWER_BOUND = 100;
Expand Down Expand Up @@ -100,10 +98,7 @@ private void createTopic(String topicName, int numPartitions, short replicationF
replicationFactor);
NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
try {
kafkaAdminClient
.createTopics(Collections.singletonList(newTopic))
.all()
.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
kafkaAdminClient.createTopics(Collections.singletonList(newTopic)).all().get();
} catch (Exception e) {
throw new RuntimeException(String.format("Cannot create topic '%s'", topicName), e);
}
Expand All @@ -112,10 +107,7 @@ private void createTopic(String topicName, int numPartitions, short replicationF
private void deleteTopic(String topicName) {
LOG.debug("Deleting Kafka topic {}", topicName);
try {
kafkaAdminClient
.deleteTopics(Collections.singletonList(topicName))
.all()
.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
kafkaAdminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
} catch (Exception e) {
if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp
public static void setup() throws Throwable {
KafkaSourceTestEnv.setup();
try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
adminClient.createTopics(
Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)));
adminClient
.createTopics(
Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)))
.all()
.get();
// Use the admin client to trigger the creation of internal __consumer_offsets topic.
// This makes sure that we won't see unavailable coordinator in the tests.
waitUtil(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,10 @@ private KafkaPartitionDataWriter scaleOutTopic(String topicName) throws Exceptio
new TopicPartition(topicName, numPartitions));
} else {
LOG.info("Creating topic '{}'", topicName);
adminClient.createTopics(
Collections.singletonList(new NewTopic(topicName, 1, (short) 1)));
adminClient
.createTopics(Collections.singletonList(new NewTopic(topicName, 1, (short) 1)))
.all()
.get();
return new KafkaPartitionDataWriter(
getKafkaProducerProperties(0), new TopicPartition(topicName, 0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1253,15 +1253,13 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce
new RichParallelSourceFunction<Tuple3<Integer, Integer, String>>() {

@Override
public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx)
throws Exception {
public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx) {
int partition = getRuntimeContext().getIndexOfThisSubtask();

for (int topicId = 0; topicId < numTopics; topicId++) {
for (int i = 0; i < numElements; i++) {
ctx.collect(
new Tuple3<>(
partition, i, topicNamePrefix + topicId));
new Tuple3<>(partition, i, topics.get(topicId)));
}
}
}
Expand Down Expand Up @@ -1333,8 +1331,7 @@ public void flatMap(
tryExecute(env, "Count elements from the topics");

// delete all topics again
for (int i = 0; i < numTopics; i++) {
final String topic = topicNamePrefix + i;
for (String topic : topics) {
deleteTestTopic(topic);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,11 @@ public void deleteTestTopic(String topic) {

private void tryDelete(AdminClient adminClient, String topic) throws Exception {
try {
adminClient
.deleteTopics(Collections.singleton(topic))
.all()
.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
adminClient.deleteTopics(Collections.singleton(topic)).all().get();
CommonTestUtils.waitUtil(
() -> {
try {
return adminClient.listTopics().listings()
.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS).stream()
return adminClient.listTopics().listings().get().stream()
.map(TopicListing::name)
.noneMatch((name) -> name.equals(topic));
} catch (Exception e) {
Expand All @@ -164,11 +160,7 @@ private void tryDelete(AdminClient adminClient, String topic) throws Exception {
LOG.info(
"Did not receive delete topic response within {} seconds. Checking if it succeeded",
REQUEST_TIMEOUT_SECONDS);
if (adminClient
.listTopics()
.names()
.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.contains(topic)) {
if (adminClient.listTopics().names().get().contains(topic)) {
throw new Exception("Topic still exists after timeout", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/** Base class for Kafka Table IT Cases. */
Expand Down Expand Up @@ -135,8 +134,16 @@ public void createTestTopic(String topic, int numPartitions, int replicationFact
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
try (AdminClient admin = AdminClient.create(properties)) {
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, (short) replicationFactor)));
Collections.singletonList(
new NewTopic(topic, numPartitions, (short) replicationFactor)))
.all()
.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format(
"Fail to create topic [%s partitions: %d replication factor: %d].",
topic, numPartitions, replicationFactor),
e);
}
}

Expand All @@ -145,7 +152,7 @@ public Map<TopicPartition, OffsetAndMetadata> getConsumerOffset(String groupId)
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
try (AdminClient admin = AdminClient.create(properties)) {
ListConsumerGroupOffsetsResult result = admin.listConsumerGroupOffsets(groupId);
return result.partitionsToOffsetAndMetadata().get(20, TimeUnit.SECONDS);
return result.partitionsToOffsetAndMetadata().get();
} catch (Exception e) {
throw new IllegalStateException(
String.format("Fail to get consumer offsets with the group id [%s].", groupId),
Expand All @@ -157,7 +164,9 @@ public void deleteTestTopic(String topic) {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
try (AdminClient admin = AdminClient.create(properties)) {
admin.deleteTopics(Collections.singletonList(topic));
admin.deleteTopics(Collections.singletonList(topic)).all().get();
} catch (Exception e) {
throw new IllegalStateException(String.format("Fail to delete topic [%s].", topic), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,16 @@ public void createTopic(int replicationFactor, int numPartitions, String topic)
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, container.getBootstrapServers());
try (AdminClient admin = AdminClient.create(properties)) {
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, (short) replicationFactor)));
Collections.singletonList(
new NewTopic(topic, numPartitions, (short) replicationFactor)))
.all()
.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format(
"Fail to create topic [%s partitions: %d replication factor: %d].",
topic, numPartitions, replicationFactor),
e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ public void testKafka() throws Exception {
// create the required topics
final short replicationFactor = 1;
admin.createTopics(
Lists.newArrayList(
new NewTopic(inputTopic, 1, replicationFactor),
new NewTopic(outputTopic, 1, replicationFactor)));
Lists.newArrayList(
new NewTopic(inputTopic, 1, replicationFactor),
new NewTopic(outputTopic, 1, replicationFactor)))
.all()
.get();

producer.send(new ProducerRecord<>(inputTopic, 1));
producer.send(new ProducerRecord<>(inputTopic, 2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,20 @@ private void sendMessages(String topic, String... messages) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
props.put(ProducerConfig.ACKS_CONFIG, "all");

final int numPartitions = 1;
final short replicationFactor = 1;
try (AdminClient admin = AdminClient.create(props)) {
admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1)));
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, replicationFactor)))
.all()
.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format(
"Fail to create topic [%s partitions: %d replicas: %d].",
topic, numPartitions, replicationFactor),
e);
}

try (Producer<Bytes, String> producer =
Expand Down

0 comments on commit c66ef08

Please sign in to comment.