Skip to content

Commit

Permalink
feat: heartbeat to virtual queues during ReceiveMessage as well (#50)
Browse files Browse the repository at this point in the history
* Confirming #47 (test fails)

Horray for TDD!

* Implement heartbeating during receive on virtual queues

Also add a log message to help debugging in the future

* Fix import

* Added motivation comment
  • Loading branch information
robin-aws committed Jul 3, 2020
1 parent 94cdda4 commit 87ae414
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public static AmazonSQSResponderClientBuilder standard() {
public AmazonSQSResponder build() {
AmazonSQS sqs = customSQS.orElseGet(AmazonSQSClientBuilder::defaultClient);
AmazonSQS deleter = new AmazonSQSIdleQueueDeletingClient(sqs, internalQueuePrefix, queueHeartbeatInterval);
AmazonSQS virtualQueuesClient = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(deleter).build();
AmazonSQS virtualQueuesClient = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(deleter)
.withHeartbeatIntervalSeconds(queueHeartbeatInterval).build();
return new AmazonSQSResponderClient(virtualQueuesClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ private AmazonSQSTemporaryQueuesClient(AmazonSQS virtualizer, AmazonSQSIdleQueue
public static AmazonSQSTemporaryQueuesClient make(AmazonSQSRequesterClientBuilder builder) {
AmazonSQS sqs = builder.getAmazonSQS().orElseGet(AmazonSQSClientBuilder::defaultClient);
AmazonSQSIdleQueueDeletingClient deleter = new AmazonSQSIdleQueueDeletingClient(sqs, builder.getInternalQueuePrefix(), builder.getQueueHeartbeatInterval());
AmazonSQS virtualizer = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(deleter).build();
AmazonSQS virtualizer = AmazonSQSVirtualQueuesClientBuilder.standard()
.withAmazonSQS(deleter)
.withHeartbeatIntervalSeconds(builder.getQueueHeartbeatInterval())
.build();
AmazonSQSTemporaryQueuesClient temporaryQueuesClient = new AmazonSQSTemporaryQueuesClient(virtualizer, deleter, builder.getInternalQueuePrefix());
AmazonSQSRequesterClient requester = new AmazonSQSRequesterClient(temporaryQueuesClient, builder.getInternalQueuePrefix(), builder.getQueueAttributes());
AmazonSQSResponderClient responder = new AmazonSQSResponderClient(temporaryQueuesClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -90,6 +90,14 @@ class AmazonSQSVirtualQueuesClient extends AbstractAmazonSQSClientWrapper {

private final int maxWaitTimeSeconds;

// This is currently only relevant for receives, since all operations on virtual queues
// are handled locally and heartbeats are cheap. Only receives have any wait time and therefore
// need to heartbeat *during* the operation.
// An alternative approach would be to remove the expiration scheduled task during the receive
// and restore it afterwards, but I prefer the hearbeating approach because it guards against
// threads dying or deadlocking.
private final long heartbeatIntervalSeconds;

private static final String VIRTUAL_QUEUE_NAME_ATTRIBUTE = "__AmazonSQSVirtualQueuesClient.QueueName";

static final BiConsumer<String, Message> DEFAULT_ORPHANED_MESSAGE_HANDLER = (queueName, message) -> {
Expand Down Expand Up @@ -120,12 +128,14 @@ private static ScheduledExecutorService createIdleQueueDeletionExecutor() {
AmazonSQSVirtualQueuesClient(AmazonSQS amazonSqsToBeExtended,
Optional<BiConsumer<String, Message>> messageHandlerOptional,
BiConsumer<String, Message> orphanedMessageHandler,
int hostQueuePollingThreads, int maxWaitTimeSeconds) {
int hostQueuePollingThreads, int maxWaitTimeSeconds,
long heartbeatIntervalSeconds) {
super(amazonSqsToBeExtended);
this.messageHandlerOptional = Objects.requireNonNull(messageHandlerOptional);
this.orphanedMessageHandler = Objects.requireNonNull(orphanedMessageHandler);
this.hostQueuePollingThreads = hostQueuePollingThreads;
this.maxWaitTimeSeconds = maxWaitTimeSeconds;
this.heartbeatIntervalSeconds = heartbeatIntervalSeconds;
}

private Optional<VirtualQueue> getVirtualQueue(String queueUrl) {
Expand Down Expand Up @@ -348,10 +358,22 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) {
heartbeat();
try {
try {
return receiveBuffer.receiveMessageAsync(request)
.get(Optional.ofNullable(request.getWaitTimeSeconds()).orElse(0).longValue(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Fall through to an empty receive
Future<ReceiveMessageResult> future = receiveBuffer.receiveMessageAsync(request);
long waitTimeSeconds = Optional.ofNullable(request.getWaitTimeSeconds()).orElse(0).longValue();
// Necessary to ensure the loop terminates
if (waitTimeSeconds < 0) {
throw new IllegalArgumentException("WaitTimeSeconds cannot be negative: " + waitTimeSeconds);
}
do {
long waitTimeBeforeHeartBeat = Math.min(heartbeatIntervalSeconds, waitTimeSeconds);
try {
return future.get(waitTimeBeforeHeartBeat, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Fall through
}
heartbeat();
waitTimeSeconds -= waitTimeBeforeHeartBeat;
} while (waitTimeSeconds > 0);
} catch (ExecutionException e) {
throw (RuntimeException)e.getCause();
} catch (InterruptedException e) {
Expand All @@ -376,7 +398,7 @@ public DeleteMessageResult deleteMessage(DeleteMessageRequest request) {
public void heartbeat() {
expireFuture.ifPresent(f -> f.cancel(false));
expireFuture = retentionPeriod.map(period ->
executor.schedule(() -> AmazonSQSVirtualQueuesClient.this.deleteQueue(id.getQueueUrl()), period, TimeUnit.SECONDS));
executor.schedule(this::deleteIdleQueue, period, TimeUnit.SECONDS));
}

public DeleteQueueResult deleteQueue() {
Expand All @@ -385,7 +407,12 @@ public DeleteQueueResult deleteQueue() {
expireFuture.ifPresent(f -> f.cancel(false));
return new DeleteQueueResult();
}


private void deleteIdleQueue() {
LOG.info("Deleting idle virtual queue: " + id.getQueueUrl());
deleteQueue();
}

public TagQueueResult tagQueue(TagQueueRequest request) {
tags.putAll(request.getTags());
return new TagQueueResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class AmazonSQSVirtualQueuesClientBuilder {

private int maxWaitTimeSeconds = 20;

private long heartbeatIntervalSeconds = AmazonSQSIdleQueueDeletingClient.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;

private AmazonSQSVirtualQueuesClientBuilder() {
}

Expand Down Expand Up @@ -121,6 +123,19 @@ public AmazonSQSVirtualQueuesClientBuilder withMaxWaitTimeSeconds(int maxWaitTim
return this;
}

public long getHeartbeatIntervalSeconds() {
return heartbeatIntervalSeconds;
}

public void setHeartbeatIntervalSeconds(long heartbeatIntervalSeconds) {
this.heartbeatIntervalSeconds = heartbeatIntervalSeconds;
}

public AmazonSQSVirtualQueuesClientBuilder withHeartbeatIntervalSeconds(long heartbeatIntervalSeconds) {
setHeartbeatIntervalSeconds(heartbeatIntervalSeconds);
return this;
}

/**
* @return Create new instance of builder with all defaults set.
*/
Expand All @@ -130,6 +145,6 @@ public static AmazonSQSVirtualQueuesClientBuilder standard() {

public AmazonSQS build() {
return new AmazonSQSVirtualQueuesClient(amazonSQS, messageHandler, orphanedMessageHandler,
hostQueuePollingThreads, maxWaitTimeSeconds);
hostQueuePollingThreads, maxWaitTimeSeconds, heartbeatIntervalSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,32 @@ public void ReceiveMessageWaitTimeSecondsNull() {
} catch (NullPointerException npe) {
fail("NPE not expected with null WaitTimeSeconds on ReceiveMessageRequest");
}

// Delete the queue so we don't get a spurious message about it expiring during the test shutdown
client.deleteQueue(virtualQueueUrl);
}

@Test
public void virtualQueueShouldNotExpireDuringLongReceive() throws InterruptedException {
CreateQueueRequest request = new CreateQueueRequest()
.withQueueName("ShortLived")
.addAttributesEntry(AmazonSQSVirtualQueuesClient.VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE, hostQueueUrl)
.addAttributesEntry(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, "10");
String virtualQueueUrl = client.createQueue(request).getQueueUrl();

// Do a single long receive call, longer than the retention period.
// This tests that the queue is still considered in use during the call
// and not just at the beginning.
ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest()
.withQueueUrl(virtualQueueUrl)
.withWaitTimeSeconds(20);
assertEquals(0, client.receiveMessage(receiveRequest).getMessages().size());

// Ensure the queue still exists
client.sendMessage(virtualQueueUrl, "Boy I'm sure glad you didn't get deleted");
assertEquals(1, client.receiveMessage(receiveRequest).getMessages().size());

// Delete the queue so we don't get a spurious message about it expiring during the test shutdown
client.deleteQueue(virtualQueueUrl);
}
}

0 comments on commit 87ae414

Please sign in to comment.