Skip to content

Commit

Permalink
[FLINK-26182][Connector/pulsar] Create e2e tests for the Pulsar sourc…
Browse files Browse the repository at this point in the history
…e and sink based on the connector testing framework.
  • Loading branch information
syhily authored and tisonkun committed Sep 30, 2022
1 parent 8027b3c commit f0fe85a
Show file tree
Hide file tree
Showing 21 changed files with 479 additions and 162 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
Expand Down
2 changes: 0 additions & 2 deletions flink-connectors/flink-connector-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ under the License.
<include>**/testutils/**</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
<include>containers/txnStandalone.conf</include>
</includes>
</configuration>
</execution>
Expand All @@ -324,7 +323,6 @@ under the License.
<include>**/testutils/**</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
<include>containers/txnStandalone.conf</include>
</includes>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,28 @@

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
import org.apache.flink.connector.pulsar.testutils.function.ControlSource;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext;
import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestSuiteBase;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.FailsOnJava11;
import org.apache.flink.testutils.junit.SharedObjectsExtension;

import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -44,63 +56,90 @@

/** Tests for using PulsarSink writing to a Pulsar cluster. */
@Category(value = {FailsOnJava11.class})
class PulsarSinkITCase extends PulsarTestSuiteBase {

private static final int PARALLELISM = 1;

@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());

// Using this extension for creating shared reference which would be used in source function.
@RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();

@ParameterizedTest
@EnumSource(DeliveryGuarantee.class)
void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
// A random topic with partition 4.
String topic = randomAlphabetic(8);
operator().createTopic(topic, 4);
int counts = ThreadLocalRandom.current().nextInt(100, 200);

ControlSource source =
new ControlSource(
sharedObjects,
operator(),
topic,
guarantee,
counts,
Duration.ofMillis(50),
Duration.ofMinutes(5));
PulsarSink<String> sink =
PulsarSink.builder()
.setServiceUrl(operator().serviceUrl())
.setAdminUrl(operator().adminUrl())
.setDeliveryGuarantee(guarantee)
.setTopics(topic)
.setSerializationSchema(flinkSchema(new SimpleStringSchema()))
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(PARALLELISM);
if (guarantee != DeliveryGuarantee.NONE) {
env.enableCheckpointing(500L);
}
env.addSource(source).sinkTo(sink);
env.execute();
class PulsarSinkITCase {

/** Integration test based on connector testing framework. */
@Nested
class IntegrationTest extends PulsarSinkTestSuiteBase {

@TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();

@TestExternalSystem
PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());

@TestSemantics
CheckpointingMode[] semantics =
new CheckpointingMode[] {
CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
};

@TestContext
PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
}

/** Tests for using PulsarSink writing to a Pulsar cluster. */
@Nested
class DeliveryGuaranteeTest extends PulsarTestSuiteBase {

List<String> expectedRecords = source.getExpectedRecords();
List<String> consumedRecords = source.getConsumedRecords();
private static final int PARALLELISM = 1;

assertThat(consumedRecords)
.hasSameSizeAs(expectedRecords)
.containsExactlyInAnyOrderElementsOf(expectedRecords);
@RegisterExtension
private final MiniClusterExtension clusterExtension =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());

// Using this extension for creating shared reference which would be used in source
// function.
@RegisterExtension
final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();

@ParameterizedTest
@EnumSource(DeliveryGuarantee.class)
void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
// A random topic with partition 4.
String topic = randomAlphabetic(8);
operator().createTopic(topic, 4);
int counts = ThreadLocalRandom.current().nextInt(100, 200);

ControlSource source =
new ControlSource(
sharedObjects,
operator(),
topic,
guarantee,
counts,
Duration.ofMillis(50),
Duration.ofMinutes(5));
PulsarSink<String> sink =
PulsarSink.builder()
.setServiceUrl(operator().serviceUrl())
.setAdminUrl(operator().adminUrl())
.setDeliveryGuarantee(guarantee)
.setTopics(topic)
.setSerializationSchema(flinkSchema(new SimpleStringSchema()))
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(PARALLELISM);
if (guarantee != DeliveryGuarantee.NONE) {
env.enableCheckpointing(500L);
}
env.addSource(source).sinkTo(sink);
env.execute();

List<String> expectedRecords = source.getExpectedRecords();
List<String> consumedRecords = source.getConsumedRecords();

assertThat(consumedRecords)
.hasSameSizeAs(expectedRecords)
.containsExactlyInAnyOrderElementsOf(expectedRecords);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
Expand All @@ -34,8 +35,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.junit.experimental.categories.Category;

import static org.apache.pulsar.client.api.SubscriptionType.Shared;

/**
* Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Shared} subscription.
*/
Expand All @@ -53,7 +52,10 @@ public class PulsarUnorderedSourceITCase extends UnorderedSourceTestSuiteBase<St
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};

@TestContext
PulsarTestContextFactory<String, MultipleTopicConsumingContext> multipleTopic =
new PulsarTestContextFactory<>(
pulsar, env -> new MultipleTopicConsumingContext(env, Shared));
PulsarTestContextFactory<String, SharedSubscriptionContext> sharedSubscription =
new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);

@TestContext
PulsarTestContextFactory<String, KeySharedSubscriptionContext> keySharedSubscription =
new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.streaming.api.CheckpointingMode;

Expand Down Expand Up @@ -59,9 +58,7 @@ public static PulsarPartitionSplit createPartitionSplit(

public static PulsarPartitionSplit createPartitionSplit(
String topic, int partitionId, Boundedness boundedness, MessageId latestConsumedId) {
TopicPartition topicPartition =
new TopicPartition(topic, partitionId, TopicRange.createFullRange());

TopicPartition topicPartition = new TopicPartition(topic, partitionId);
StopCursor stopCursor =
boundedness == Boundedness.BOUNDED ? StopCursor.latest() : StopCursor.never();
return new PulsarPartitionSplit(topicPartition, stopCursor, latestConsumedId, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ public String toString() {

@Override
public List<URL> getConnectorJarPaths() {
// We don't need any tests jar definition. They are provided in docker-related environments.
// We don't need any test jars definition. They are provided in docker-related environments.
return Collections.emptyList();
}

@Override
public void close() throws Exception {}
public void close() throws Exception {
// All the topics would be deleted in the PulsarRuntime. No need to manually close them.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.testframe.external.ExternalContext;

import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
Expand All @@ -48,7 +47,6 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -228,12 +226,7 @@ public void deleteTopic(String topic) {
public List<TopicPartition> topicInfo(String topic) {
try {
return client().getPartitionsForTopic(topic).get().stream()
.map(
p ->
new TopicPartition(
topic,
TopicName.getPartitionIndex(p),
TopicRange.createFullRange()))
.map(p -> new TopicPartition(topic, TopicName.getPartitionIndex(p)))
.collect(toList());
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
Expand Down Expand Up @@ -452,7 +445,7 @@ public Configuration sinkConfig(DeliveryGuarantee deliveryGuarantee) {

/** This method is used for test framework. You can't close this operator manually. */
@Override
public void close() throws IOException {
public void close() throws PulsarClientException {
if (admin != null) {
admin.close();
}
Expand Down Expand Up @@ -485,7 +478,7 @@ private void createPartitionedTopic(String topic, int numberOfPartitions) {
}
}

private synchronized <T> Producer<T> createProducer(String topic, Schema<T> schema) {
private <T> Producer<T> createProducer(String topic, Schema<T> schema) {
ProducerBuilder<T> builder =
client().newProducer(schema)
.topic(topic)
Expand All @@ -496,7 +489,7 @@ private synchronized <T> Producer<T> createProducer(String topic, Schema<T> sche
return sneakyClient(builder::create);
}

private synchronized <T> Consumer<T> createConsumer(String topic, Schema<T> schema) {
private <T> Consumer<T> createConsumer(String topic, Schema<T> schema) {
// Create the earliest subscription if it's not existed.
List<String> subscriptions = sneakyAdmin(() -> admin().topics().getSubscriptions(topic));
if (!subscriptions.contains(SUBSCRIPTION_NAME)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
Expand Down Expand Up @@ -88,22 +87,30 @@ public void startUp() {
return;
}

// Override the default configuration in container for enabling the Pulsar transaction.
container.withClasspathResourceMapping(
"docker/bootstrap.sh", "/pulsar/bin/bootstrap.sh", BindMode.READ_ONLY);
// Waiting for the Pulsar border is ready.
// Override the default standalone configuration by system environments.
container.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
container.withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true");
container.withEnv("PULSAR_PREFIX_systemTopicEnabled", "true");
container.withEnv("PULSAR_PREFIX_brokerDeduplicationEnabled", "true");
container.withEnv("PULSAR_PREFIX_defaultNumberOfNamespaceBundles", "1");
// Change the default bootstrap script, it will override the default configuration
// and start a standalone Pulsar without streaming storage and function worker.
container.withCommand(
"sh",
"-c",
"/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && /pulsar/bin/pulsar standalone --no-functions-worker -nss");
// Waiting for the Pulsar broker and the transaction is ready after the container started.
container.waitingFor(
forHttp("/admin/v2/namespaces/public/default")
forHttp(
"/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions")
.forPort(BROKER_HTTP_PORT)
.forStatusCode(200)
.withStartupTimeout(Duration.ofMinutes(5)));
// Set custom startup script.
container.withCommand("sh /pulsar/bin/bootstrap.sh");

// Start the Pulsar Container.
container.start();
// Append the output to this runtime logger. Used for local debug purpose.
container.followOutput(new Slf4jLogConsumer(LOG).withSeparateOutputStreams());
container.followOutput(new Slf4jLogConsumer(LOG, true));

// Create the operator.
if (boundFlink) {
Expand Down
Loading

0 comments on commit f0fe85a

Please sign in to comment.