Skip to content

Commit

Permalink
[FLINK-16572] [pubsub,e2e] Only acknowledge list of messages if the l…
Browse files Browse the repository at this point in the history
…ist is not empty + small style fixes (removal of uncommented code etc)]
  • Loading branch information
Xeli authored and rmetzger committed Apr 21, 2020
1 parent 7269e6c commit a0f349f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testPull() throws Exception {
.build())
.get();

List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 10);
List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 1);
assertEquals(1, receivedMessages.size());
assertEquals("Hello World PULL", receivedMessages.get(0).getMessage().getData().toStringUtf8());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;


/**
* A helper class to make managing the testing topics a bit easier.
Expand All @@ -52,7 +52,7 @@ public class PubsubHelper {

private static final Logger LOG = LoggerFactory.getLogger(PubsubHelper.class);

private TransportChannelProvider channelProvider = null;
private TransportChannelProvider channelProvider;

private TopicAdminClient topicClient;
private SubscriptionAdminClient subscriptionAdminClient;
Expand All @@ -61,10 +61,6 @@ public PubsubHelper(TransportChannelProvider channelProvider) {
this.channelProvider = channelProvider;
}

public TransportChannelProvider getChannelProvider() {
return channelProvider;
}

public TopicAdminClient getTopicAdminClient() throws IOException {
if (topicClient == null) {
TopicAdminSettings topicAdminSettings = TopicAdminSettings.newBuilder()
Expand Down Expand Up @@ -129,14 +125,12 @@ public void createSubscription(String subscriptionProject, String subscription,

deleteSubscription(subscriptionName);

SubscriptionAdminClient adminClient = getSubscriptionAdminClient();

ProjectTopicName topicName = ProjectTopicName.of(topicProject, topic);

PushConfig pushConfig = PushConfig.getDefaultInstance();

LOG.info("CreateSubscription {}", subscriptionName);
getSubscriptionAdminClient().createSubscription(subscriptionName, topicName, pushConfig, 1);
getSubscriptionAdminClient().createSubscription(subscriptionName, topicName, pushConfig, 1).isInitialized();
}

public void deleteSubscription(String subscriptionProject, String subscription) throws IOException {
Expand Down Expand Up @@ -167,35 +161,34 @@ public List<ReceivedMessage> pullMessages(String projectId, String subscriptionI
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
// String projectId = "my-project-id";
// String subscriptionId = "my-subscription-id";
// int numOfMessages = 10; // max number of messages to be pulled
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(maxNumberOfMessages)
.setReturnImmediately(false) // return immediately if messages are not available
.setReturnImmediately(false)
.setSubscription(subscriptionName)
.build();

// use pullCallable().futureCall to asynchronously perform this operation
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
// handle received message
// ...
ackIds.add(message.getAckId());
}
// acknowledge received messages
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// use acknowledgeCallable().futureCall to asynchronously perform this operation
subscriber.acknowledgeCallable().call(acknowledgeRequest);
return pullResponse.getReceivedMessagesList();
List<ReceivedMessage> receivedMessages = subscriber.pullCallable().call(pullRequest).getReceivedMessagesList();
acknowledgeIds(subscriber, subscriptionName, receivedMessages);
return receivedMessages;
}
}

private void acknowledgeIds(SubscriberStub subscriber, String subscriptionName, List<ReceivedMessage> receivedMessages) {
if (receivedMessages.isEmpty()) {
return;
}

List<String> ackIds = receivedMessages.stream().map(ReceivedMessage::getAckId).collect(Collectors.toList());
// acknowledge received messages
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// use acknowledgeCallable().futureCall to asynchronously perform this operation
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}

public Subscriber subscribeToSubscription(String project, String subscription, MessageReceiver messageReceiver) {
Expand Down

0 comments on commit a0f349f

Please sign in to comment.