Skip to content

Commit

Permalink
Add public constant class (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-aws committed Jul 22, 2021
1 parent 8345cdb commit 898366c
Show file tree
Hide file tree
Showing 15 changed files with 64 additions and 52 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Temporary queues are also automatically deleted if the clients that created them
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
<version>1.2.1</version>
<version>1.2.2</version>
<type>jar</type>
</dependency>
```
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
<version>1.2.1</version>
<version>1.2.2</version>
<name>Amazon SQS Java Temporary Queues Client</name>
<description>An Amazon SQS client that supports creating lightweight, automatically-deleted temporary queues, for use in common messaging patterns such as Request/Response. See http:https://aws.amazon.com/sqs.</description>
<url>https://github.com/awslabs/amazon-sqs-java-temporary-queues-client</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.function.Consumer;

import com.amazonaws.services.sqs.model.QueueNameExistsException;
import com.amazonaws.services.sqs.util.Constants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -63,12 +64,6 @@ class AmazonSQSIdleQueueDeletingClient extends AbstractAmazonSQSClientWrapper {

private static final Log LOG = LogFactory.getLog(AmazonSQSIdleQueueDeletingClient.class);

// Publicly visible constants
public static final String IDLE_QUEUE_RETENTION_PERIOD = "IdleQueueRetentionPeriodSeconds";
public static final long MINIMUM_IDLE_QUEUE_RETENTION_PERIOD_SECONDS = 1;
public static final long HEARTBEAT_INTERVAL_SECONDS_DEFAULT = 5;
public static final long HEARTBEAT_INTERVAL_SECONDS_MIN_VALUE = 1;

static final String IDLE_QUEUE_RETENTION_PERIOD_TAG = "__IdleQueueRetentionPeriodSeconds";

private static final String SWEEPING_QUEUE_DLQ_SUFFIX = "_DLQ";
Expand Down Expand Up @@ -111,15 +106,15 @@ public AmazonSQSIdleQueueDeletingClient(AmazonSQS sqs, String queueNamePrefix, L
this.queueNamePrefix = queueNamePrefix;

if (heartbeatIntervalSeconds != null) {
if (heartbeatIntervalSeconds < HEARTBEAT_INTERVAL_SECONDS_MIN_VALUE) {
if (heartbeatIntervalSeconds < Constants.HEARTBEAT_INTERVAL_SECONDS_MIN_VALUE) {
throw new IllegalArgumentException("Heartbeat Interval Seconds: " +
heartbeatIntervalSeconds +
" must be equal to or bigger than " +
HEARTBEAT_INTERVAL_SECONDS_MIN_VALUE);
Constants.HEARTBEAT_INTERVAL_SECONDS_MIN_VALUE);
}
this.heartbeatIntervalSeconds = heartbeatIntervalSeconds;
} else {
this.heartbeatIntervalSeconds = HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
this.heartbeatIntervalSeconds = Constants.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
}
}

Expand Down Expand Up @@ -201,7 +196,7 @@ public CreateQueueResult createQueue(CreateQueueRequest request) {
List<String> attributeNames = Arrays.asList(QueueAttributeName.ReceiveMessageWaitTimeSeconds.toString(),
QueueAttributeName.VisibilityTimeout.toString());
Map<String, String> createdAttributes = amazonSqsToBeExtended.getQueueAttributes(queueUrl, attributeNames).getAttributes();
createdAttributes.put(IDLE_QUEUE_RETENTION_PERIOD, retentionPeriodString);
createdAttributes.put(Constants.IDLE_QUEUE_RETENTION_PERIOD, retentionPeriodString);

QueueMetadata metadata = new QueueMetadata(queueName, queueUrl, createdAttributes);
queues.put(queueUrl, metadata);
Expand All @@ -214,15 +209,15 @@ public CreateQueueResult createQueue(CreateQueueRequest request) {
}

static Optional<Long> getRetentionPeriod(Map<String, String> queueAttributes) {
return Optional.ofNullable(queueAttributes.remove(IDLE_QUEUE_RETENTION_PERIOD))
return Optional.ofNullable(queueAttributes.remove(Constants.IDLE_QUEUE_RETENTION_PERIOD))
.map(Long::parseLong)
.map(AmazonSQSIdleQueueDeletingClient::checkQueueRetentionPeriodBounds);
}

static long checkQueueRetentionPeriodBounds(long retentionPeriod) {
if (retentionPeriod < MINIMUM_IDLE_QUEUE_RETENTION_PERIOD_SECONDS) {
throw new IllegalArgumentException("The " + IDLE_QUEUE_RETENTION_PERIOD +
" attribute bigger or equal to " + MINIMUM_IDLE_QUEUE_RETENTION_PERIOD_SECONDS + " seconds");
if (retentionPeriod < Constants.MINIMUM_IDLE_QUEUE_RETENTION_PERIOD_SECONDS) {
throw new IllegalArgumentException("The " + Constants.IDLE_QUEUE_RETENTION_PERIOD +
" attribute bigger or equal to " + Constants.MINIMUM_IDLE_QUEUE_RETENTION_PERIOD_SECONDS + " seconds");
}
return retentionPeriod;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.util.Constants;
import com.amazonaws.services.sqs.util.SQSMessageConsumer;
import com.amazonaws.services.sqs.util.SQSQueueUtils;

Expand All @@ -23,9 +24,6 @@
* temporary queue for each response message.
*/
class AmazonSQSRequesterClient implements AmazonSQSRequester {

public static final String RESPONSE_QUEUE_URL_ATTRIBUTE_NAME = "ResponseQueueUrl";

private final AmazonSQS sqs;
private final String queuePrefix;
private final Map<String, String> queueAttributes;
Expand Down Expand Up @@ -70,7 +68,7 @@ public CompletableFuture<Message> sendMessageAndGetResponseAsync(SendMessageRequ
String responseQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();

SendMessageRequest requestWithResponseUrl = SQSQueueUtils.copyWithExtraAttributes(request,
Collections.singletonMap(RESPONSE_QUEUE_URL_ATTRIBUTE_NAME,
Collections.singletonMap(Constants.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME,
new MessageAttributeValue().withDataType("String").withStringValue(responseQueueUrl)));
// TODO-RS: Should be using sendMessageAsync
sqs.sendMessage(requestWithResponseUrl);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.amazonaws.services.sqs;

import com.amazonaws.services.sqs.util.Constants;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -15,7 +17,7 @@ public class AmazonSQSRequesterClientBuilder {
private Map<String, String> queueAttributes = Collections.emptyMap();

private int idleQueueSweepingPeriod = 5;
private long queueHeartbeatInterval = AmazonSQSIdleQueueDeletingClient.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
private long queueHeartbeatInterval = Constants.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
private TimeUnit idleQueueSweepingTimeUnit = TimeUnit.MINUTES;
private long idleQueueRetentionPeriodSeconds = AmazonSQSTemporaryQueuesClientBuilder.IDLE_QUEUE_RETENTION_PERIOD_SECONDS_DEFAULT;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.amazonaws.services.sqs;

import com.amazonaws.services.sqs.util.Constants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand All @@ -24,7 +25,7 @@ public AmazonSQS getAmazonSQS() {

@Override
public void sendResponseMessage(MessageContent request, MessageContent response) {
MessageAttributeValue attribute = request.getMessageAttributes().get(AmazonSQSRequesterClient.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME);
MessageAttributeValue attribute = request.getMessageAttributes().get(Constants.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME);

if (attribute != null) {
String replyQueueUrl = attribute.getStringValue();
Expand All @@ -45,7 +46,7 @@ public void sendResponseMessage(MessageContent request, MessageContent response)

@Override
public boolean isResponseMessageRequested(MessageContent requestMessage) {
return requestMessage.getMessageAttributes().containsKey(AmazonSQSRequesterClient.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME);
return requestMessage.getMessageAttributes().containsKey(Constants.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.amazonaws.services.sqs;

import com.amazonaws.services.sqs.util.Constants;

import java.util.Optional;

public class AmazonSQSResponderClientBuilder {

private Optional<AmazonSQS> customSQS = Optional.empty();

private String internalQueuePrefix = "__RequesterClientQueues__";
private long queueHeartbeatInterval = AmazonSQSIdleQueueDeletingClient.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
private long queueHeartbeatInterval = Constants.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;

private AmazonSQSResponderClientBuilder() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import com.amazonaws.services.sqs.util.AbstractAmazonSQSClientWrapper;
import com.amazonaws.services.sqs.util.Constants;
import com.amazonaws.services.sqs.util.SQSQueueUtils;

/**
Expand Down Expand Up @@ -124,14 +125,14 @@ public CreateQueueResult createQueue(CreateQueueRequest request) {

Map<String, String> extraQueueAttributes = new HashMap<>();
// Add the retention period to both the host queue and each virtual queue
extraQueueAttributes.put(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, Long.toString(idleQueueRetentionPeriodSeconds));
extraQueueAttributes.put(Constants.IDLE_QUEUE_RETENTION_PERIOD, Long.toString(idleQueueRetentionPeriodSeconds));
String hostQueueUrl = hostQueueUrls.computeIfAbsent(request.getAttributes(), attributes -> {
CreateQueueRequest hostQueueCreateRequest = SQSQueueUtils.copyWithExtraAttributes(request, extraQueueAttributes);
hostQueueCreateRequest.setQueueName(prefix + '-' + hostQueueUrls.size());
return amazonSqsToBeExtended.createQueue(hostQueueCreateRequest).getQueueUrl();
});

extraQueueAttributes.put(AmazonSQSVirtualQueuesClient.VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE, hostQueueUrl);
extraQueueAttributes.put(Constants.VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE, hostQueueUrl);
// The host queue takes care of all the other queue attributes, so don't specify them when creating the virtual
// queue or else the client may think we're trying to set them independently!
CreateQueueRequest createVirtualQueueRequest = new CreateQueueRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.amazonaws.services.sqs.model.UntagQueueRequest;
import com.amazonaws.services.sqs.model.UntagQueueResult;
import com.amazonaws.services.sqs.util.AbstractAmazonSQSClientWrapper;
import com.amazonaws.services.sqs.util.Constants;
import com.amazonaws.services.sqs.util.DaemonThreadFactory;
import com.amazonaws.services.sqs.util.ReceiveQueueBuffer;
import com.amazonaws.services.sqs.util.SQSMessageConsumer;
Expand All @@ -49,7 +50,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;

import static com.amazonaws.services.sqs.AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD;
import static com.amazonaws.services.sqs.util.Constants.IDLE_QUEUE_RETENTION_PERIOD;

/**
* An AmazonSQS wrapper that adds support for "virtual" queues, which are logical
Expand Down Expand Up @@ -81,8 +82,6 @@ class AmazonSQSVirtualQueuesClient extends AbstractAmazonSQSClientWrapper {

private static final Log LOG = LogFactory.getLog(AmazonSQSVirtualQueuesClient.class);

public static final String VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE = "HostQueueUrl";

// This is just protection against bad logic that creates unbounded queues.
public static final int MAXIMUM_VIRTUAL_QUEUES_COUNT = 1_000_000;

Expand Down Expand Up @@ -154,13 +153,13 @@ private Optional<VirtualQueue> getVirtualQueue(String queueUrl) {

@Override
public CreateQueueResult createQueue(CreateQueueRequest request) {
String hostQueueUrl = request.getAttributes().get(VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE);
String hostQueueUrl = request.getAttributes().get(Constants.VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE);
if (hostQueueUrl == null) {
return amazonSqsToBeExtended.createQueue(request);
}

Map<String, String> attributes = new HashMap<>(request.getAttributes());
attributes.remove(VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE);
attributes.remove(Constants.VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE);

Optional<Long> retentionPeriod = AmazonSQSIdleQueueDeletingClient.getRetentionPeriod(attributes);

Expand Down Expand Up @@ -341,7 +340,7 @@ public VirtualQueueID getID() {
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest request) {
List<String> attributeNames = request.getAttributeNames();
boolean includeHostQueue =
attributeNames.remove(VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE) ||
attributeNames.remove(Constants.VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE) ||
attributeNames.contains("All");
boolean includeRetentionPeriod = retentionPeriod.isPresent() &&
(attributeNames.contains(IDLE_QUEUE_RETENTION_PERIOD) ||
Expand All @@ -352,7 +351,7 @@ public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest req
.withAttributeNames(attributeNames);
GetQueueAttributesResult result = amazonSqsToBeExtended.getQueueAttributes(hostQueueRequest);
if (includeHostQueue) {
result.getAttributes().put(VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE, hostQueue.queueUrl);
result.getAttributes().put(Constants.VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE, hostQueue.queueUrl);
}
if (includeRetentionPeriod) {
result.getAttributes().put(IDLE_QUEUE_RETENTION_PERIOD, retentionPeriod.get().toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.amazonaws.services.sqs;

import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.util.Constants;

import java.util.Optional;
import java.util.function.BiConsumer;
Expand All @@ -17,7 +18,7 @@ public class AmazonSQSVirtualQueuesClientBuilder {

private int maxWaitTimeSeconds = 20;

private long heartbeatIntervalSeconds = AmazonSQSIdleQueueDeletingClient.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
private long heartbeatIntervalSeconds = Constants.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;

private AmazonSQSVirtualQueuesClientBuilder() {
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/amazonaws/services/sqs/util/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.amazonaws.services.sqs.util;

public class Constants {
public static final String RESPONSE_QUEUE_URL_ATTRIBUTE_NAME = "ResponseQueueUrl";
public static final String VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE = "HostQueueUrl";
public static final String IDLE_QUEUE_RETENTION_PERIOD = "IdleQueueRetentionPeriodSeconds";
public static final long MINIMUM_IDLE_QUEUE_RETENTION_PERIOD_SECONDS = 1;
public static final long HEARTBEAT_INTERVAL_SECONDS_DEFAULT = 5;
public static final long HEARTBEAT_INTERVAL_SECONDS_MIN_VALUE = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.util.Constants;
import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void idleQueueIsDeleted() throws InterruptedException {
client.startSweeper(requester, responder, 5, TimeUnit.SECONDS, exceptionHandler);
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.withQueueName(queueNamePrefix + "-IdleQueue")
.addAttributesEntry(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, "1");
.addAttributesEntry(Constants.IDLE_QUEUE_RETENTION_PERIOD, "1");
queueUrl = client.createQueue(createQueueRequest).getQueueUrl();

// May have to wait for up to a minute for the new queue to show up in ListQueues
Expand All @@ -72,7 +73,7 @@ public void idleQueueIsDeleted() throws InterruptedException {
public void updatedHeartBeatTag() throws InterruptedException {
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.withQueueName(queueNamePrefix + "-HeartbeatTag")
.addAttributesEntry(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, "60");
.addAttributesEntry(Constants.IDLE_QUEUE_RETENTION_PERIOD, "60");
queueUrl = client.createQueue(createQueueRequest).getQueueUrl();

SendMessageRequest sendMsgRequest = new SendMessageRequest()
Expand Down Expand Up @@ -104,7 +105,7 @@ private String getLastHeartbeatTimestamp() {
public void notUpdatedHeartBeatTag() throws InterruptedException {
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.withQueueName(queueNamePrefix + "-HeartbeatTag")
.addAttributesEntry(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, "60");
.addAttributesEntry(Constants.IDLE_QUEUE_RETENTION_PERIOD, "60");
queueUrl = client.createQueue(createQueueRequest).getQueueUrl();

SendMessageRequest sendMsgRequest = new SendMessageRequest()
Expand All @@ -128,7 +129,7 @@ public void recreatingQueues() throws InterruptedException {
String queueName = queueNamePrefix + "-DeletedTooSoon";
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.withQueueName(queueName)
.addAttributesEntry(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, "60");
.addAttributesEntry(Constants.IDLE_QUEUE_RETENTION_PERIOD, "60");
queueUrl = client.createQueue(createQueueRequest).getQueueUrl();

QueueUser user = new QueueUser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.amazonaws.services.sqs.util.Constants;
import org.junit.After;
import org.junit.Test;

import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.util.ExceptionAsserter;
import com.amazonaws.services.sqs.util.MockSQS;
Expand Down Expand Up @@ -59,7 +59,7 @@ public void happyPath() throws TimeoutException, InterruptedException, Execution

Message requestMessage = sqs.receiveMessage(queueUrl).getMessages().get(0);
assertEquals(requestMessageBody, requestMessage.getBody());
String responseQueueUrl = requestMessage.getMessageAttributes().get(AmazonSQSRequesterClient.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME).getStringValue();
String responseQueueUrl = requestMessage.getMessageAttributes().get(Constants.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME).getStringValue();
assertNotNull(responseQueueUrl);

responderClient.sendResponseMessage(MessageContent.fromMessage(requestMessage), new MessageContent(responseMessageBody));
Expand All @@ -84,7 +84,7 @@ public void timeout() throws TimeoutException, InterruptedException, ExecutionEx

Message requestMessage = sqs.receiveMessage(queueUrl).getMessages().get(0);
assertEquals(requestMessageBody, requestMessage.getBody());
String responseQueueUrl = requestMessage.getMessageAttributes().get(AmazonSQSRequesterClient.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME).getStringValue();
String responseQueueUrl = requestMessage.getMessageAttributes().get(Constants.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME).getStringValue();
assertNotNull(responseQueueUrl);

// TODO-RS: Junit 5
Expand Down
Loading

0 comments on commit 898366c

Please sign in to comment.