Skip to content

Commit

Permalink
Improve handling of missing message attribute (#59)
Browse files Browse the repository at this point in the history
* Improve handling of missing message attribute on virtual queue receive and delete the message
  • Loading branch information
adam-aws committed Feb 18, 2021
1 parent c105771 commit 63ad156
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,13 @@ public HostQueue(String queueUrl) {
}

private void dispatchMessage(Message message) {
String queueName = message.getMessageAttributes().get(VIRTUAL_QUEUE_NAME_ATTRIBUTE).getStringValue();
MessageAttributeValue messageAttributeValue = message.getMessageAttributes().get(VIRTUAL_QUEUE_NAME_ATTRIBUTE);
// Case where a message was sent with missing attribute __AmazonSQSVirtualQueuesClient.QueueName
if (messageAttributeValue == null) {
orphanedMessageHandler.accept(queueUrl, message);
return;
}
String queueName = messageAttributeValue.getStringValue();
VirtualQueue virtualQueue = virtualQueues.get(queueName);
if (virtualQueue != null) {
messageHandlerOptional.map(messageHandler -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.util.IntegrationTest;

public class AmazonSQSVirtualQueuesClientIT extends IntegrationTest {

private static String hostQueueUrl;
private static AmazonSQS client;

BiConsumer<String, Message> orphanedMessageHandlerMock;

@Before
public void setup() {
client = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(sqs).build();
orphanedMessageHandlerMock = mock(BiConsumer.class);
client = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(sqs).withOrphanedMessageHandler(orphanedMessageHandlerMock).build();
hostQueueUrl = client.createQueue(queueNamePrefix + "-HostQueue").getQueueUrl();
}

Expand All @@ -43,7 +51,7 @@ public void expiringVirtualQueue() throws InterruptedException {
.addAttributesEntry(AmazonSQSVirtualQueuesClient.VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE, hostQueueUrl)
.addAttributesEntry(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, "10");
String virtualQueueUrl = client.createQueue(request).getQueueUrl();

// Do a few long poll receives and validate the queue stays alive.
// We expect empty receives but not errors.
ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest()
Expand Down Expand Up @@ -108,4 +116,26 @@ public void virtualQueueShouldNotExpireDuringLongReceive() throws InterruptedExc
// Delete the queue so we don't get a spurious message about it expiring during the test shutdown
client.deleteQueue(virtualQueueUrl);
}

@Test
public void missingMessageAttributeIsReceivedAndDeleted() throws InterruptedException {
CreateQueueRequest request = new CreateQueueRequest()
.withQueueName("ShortLived")
.addAttributesEntry(AmazonSQSVirtualQueuesClient.VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE, hostQueueUrl)
.addAttributesEntry(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, "10");
String virtualQueueUrl = client.createQueue(request).getQueueUrl();

ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest()
.withQueueUrl(virtualQueueUrl)
.withWaitTimeSeconds(20);
SendMessageRequest sendMessageRequest = new SendMessageRequest()
.withQueueUrl(hostQueueUrl)
.withMessageBody("Missing Message attributes!")
.withDelaySeconds(5);

client.sendMessage(sendMessageRequest);
// Message sent with missing attribute is deleted
assertEquals(0, client.receiveMessage(receiveRequest).getMessages().size());
verify(orphanedMessageHandlerMock).accept(any(), any());
}
}

0 comments on commit 63ad156

Please sign in to comment.