Skip to content

Commit

Permalink
fix: comparing milleseconds to seconds (#45)
Browse files Browse the repository at this point in the history
* Fix comparing milleseconds to seconds

* heartbeatToQueueIfNecessary logic change, to send heartbeat if no heartbeats or its been more than 2X heartbeatIntervalSeconds

* Removed static import
  • Loading branch information
adam-aws authored Jul 3, 2020
1 parent e6ffd9a commit 94cdda4
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private void heartbeatToQueueIfNecessary(String queueUrl) {
QueueMetadata queueMetadata = queues.get(queueUrl);
if (queueMetadata != null) {
Long lastHeartbeat = queueMetadata.heartbeatTimestamp;
if (lastHeartbeat == null || (System.currentTimeMillis() - lastHeartbeat) > 2 * heartbeatIntervalSeconds) {
if (lastHeartbeat != null && (System.currentTimeMillis() - lastHeartbeat) < heartbeatIntervalSeconds * 1000) {
return;
}
heartbeatToQueue(queueUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -18,6 +19,7 @@
import com.amazonaws.services.sqs.util.SQSMessageConsumer;
import com.amazonaws.services.sqs.util.SQSQueueUtils;


public class AmazonSQSIdleQueueDeletingIT extends IntegrationTest {

private static AmazonSQSIdleQueueDeletingClient client;
Expand Down Expand Up @@ -65,6 +67,61 @@ public void idleQueueIsDeleted() throws InterruptedException {
Assert.assertTrue("Expected queue to be deleted: " + queueUrl,
SQSQueueUtils.awaitQueueDeleted(sqs, queueUrl, 70, TimeUnit.SECONDS));
}

@Test
public void updatedHeartBeatTag() throws InterruptedException {
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.withQueueName(queueNamePrefix + "-HeartbeatTag")
.addAttributesEntry(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, "60");
queueUrl = client.createQueue(createQueueRequest).getQueueUrl();

SendMessageRequest sendMsgRequest = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody("hello world");
client.sendMessage(sendMsgRequest);

String initialHeartBeat = getLastHeartbeatTimestamp();

// Wait 2 * heartbeatIntervalSeconds before sending message
// so that heartbeatToQueueIfNecessary calls
// heartbeatToQueue and update LAST_HEARTBEAT_TIMESTAMP_TAG
TimeUnit.SECONDS.sleep(10);
client.sendMessage(sendMsgRequest);

String updatedHeartbeat = getLastHeartbeatTimestamp();

Assert.assertNotEquals(initialHeartBeat, updatedHeartbeat);
}

private String getLastHeartbeatTimestamp() {
return client
.listQueueTags(queueUrl)
.getTags()
.get(AmazonSQSIdleQueueDeletingClient.LAST_HEARTBEAT_TIMESTAMP_TAG);
}

@Test
public void notUpdatedHeartBeatTag() throws InterruptedException {
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.withQueueName(queueNamePrefix + "-HeartbeatTag")
.addAttributesEntry(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, "60");
queueUrl = client.createQueue(createQueueRequest).getQueueUrl();

SendMessageRequest sendMsgRequest = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody("hello world");
client.sendMessage(sendMsgRequest);


String initialHeartBeat = getLastHeartbeatTimestamp();

// Should skip call to heartbeatToQueue and not update LAST_HEARTBEAT_TIMESTAMP_TAG
client.sendMessage(sendMsgRequest);

String notUpdatedHeartbeat = getLastHeartbeatTimestamp();

Assert.assertEquals(initialHeartBeat, notUpdatedHeartbeat);
}

@Test
public void recreatingQueues() throws InterruptedException {
Expand Down

0 comments on commit 94cdda4

Please sign in to comment.