Skip to content

Commit

Permalink
[FLINK-25287][connectors/kafka] Use connector testing framework for K…
Browse files Browse the repository at this point in the history
…afka tests
  • Loading branch information
PatrickRen authored and leonardBang committed Jan 23, 2022
1 parent 6b227b8 commit 8babbb0
Show file tree
Hide file tree
Showing 12 changed files with 412 additions and 438 deletions.
6 changes: 0 additions & 6 deletions flink-connectors/flink-connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,6 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-testing</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.flink.connector.kafka.sink;

import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.testutils.KafkaMultipleTopicExternalContext;
import org.apache.flink.connector.kafka.testutils.KafkaSingleTopicExternalContext;
import org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContextFactory;
import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
import org.apache.flink.connectors.test.common.environment.MiniClusterTestEnvironment;
import org.apache.flink.connectors.test.common.external.DefaultContainerizedExternalSystem;
import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
Expand Down Expand Up @@ -72,6 +71,8 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.PARTITION;
import static org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.TOPIC;
import static org.junit.jupiter.api.Assertions.assertEquals;

/** Unite test class for {@link KafkaSource}. */
Expand Down Expand Up @@ -265,7 +266,7 @@ class IntegrationTests extends SourceTestSuiteBase<String> {
MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();

// Defines external system
@ExternalSystem
@TestExternalSystem
DefaultContainerizedExternalSystem<KafkaContainer> kafka =
DefaultContainerizedExternalSystem.builder()
.fromContainer(
Expand All @@ -276,14 +277,16 @@ class IntegrationTests extends SourceTestSuiteBase<String> {
// Defines 2 External context Factories, so test cases will be invoked twice using these two
// kinds of external contexts.
@SuppressWarnings("unused")
@ExternalContextFactory
KafkaSingleTopicExternalContext.Factory singleTopic =
new KafkaSingleTopicExternalContext.Factory(kafka.getContainer());
@TestContext
KafkaSourceExternalContextFactory singleTopic =
new KafkaSourceExternalContextFactory(
kafka.getContainer(), Collections.emptyList(), PARTITION);

@SuppressWarnings("unused")
@ExternalContextFactory
KafkaMultipleTopicExternalContext.Factory multipleTopic =
new KafkaMultipleTopicExternalContext.Factory(kafka.getContainer());
@TestContext
KafkaSourceExternalContextFactory multipleTopic =
new KafkaSourceExternalContextFactory(
kafka.getContainer(), Collections.emptyList(), TOPIC);
}

// -----------------
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@

package org.apache.flink.connector.kafka.testutils;

import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

/** Source split data writer for writing test data into Kafka topic partitions. */
public class KafkaPartitionDataWriter implements SourceSplitDataWriter<String> {
public class KafkaPartitionDataWriter implements ExternalSystemSplitDataWriter<String> {

private final KafkaProducer<byte[], byte[]> kafkaProducer;
private final TopicPartition topicPartition;
Expand All @@ -40,7 +40,7 @@ public KafkaPartitionDataWriter(Properties producerProperties, TopicPartition to
}

@Override
public void writeRecords(Collection<String> records) {
public void writeRecords(List<String> records) {
for (String record : records) {
ProducerRecord<byte[], byte[]> producerRecord =
new ProducerRecord<>(
Expand All @@ -57,4 +57,8 @@ public void writeRecords(Collection<String> records) {
public void close() {
kafkaProducer.close();
}

public TopicPartition getTopicPartition() {
return topicPartition;
}
}
Loading

0 comments on commit 8babbb0

Please sign in to comment.