Skip to content

Commit

Permalink
Merge pull request #23 from hcalsos/master
Browse files Browse the repository at this point in the history
IdleQueueHeartbeatInterval configurable from builder
  • Loading branch information
adam-aws committed Jun 25, 2020
2 parents 536db00 + 9c1128c commit e6ffd9a
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
* as this client uses {@link #listQueues(ListQueuesRequest)} to sweep them.
* <p>
* This client uses a heartbeating mechanism based on queue tags. Making API calls to queues
* through this client causes tags on those queues to be refreshed every 5 seconds. If the process
* through this client causes tags on those queues to be refreshed every 5 seconds (by default,
* heartbeating mechanism is configurable). If the process
* using a client shuts down uncleanly, other client instances using the same queue prefix will
* detect that its queue(s) are idle and delete them.
*/
Expand All @@ -70,10 +71,10 @@ class AmazonSQSIdleQueueDeletingClient extends AbstractAmazonSQSClientWrapper {
public static final String IDLE_QUEUE_RETENTION_PERIOD = "IdleQueueRetentionPeriodSeconds";
public static final long MINIMUM_IDLE_QUEUE_RETENTION_PERIOD_SECONDS = 1;
public static final long MAXIMUM_IDLE_QUEUE_RETENTION_PERIOD_SECONDS = TimeUnit.MINUTES.toSeconds(5);

public static final long HEARTBEAT_INTERVAL_SECONDS_DEFAULT = 5;
public static final long HEARTBEAT_INTERVAL_SECONDS_MIN_VALUE = 1;

static final String IDLE_QUEUE_RETENTION_PERIOD_TAG = "__IdleQueueRetentionPeriodSeconds";
// TODO-RS: Configuration
private static final long HEARTBEAT_INTERVAL_SECONDS = 5;

private static final String SWEEPING_QUEUE_DLQ_SUFFIX = "_DLQ";
private static final long DLQ_MESSAGE_RETENTION_PERIOD = TimeUnit.DAYS.toSeconds(14);
Expand All @@ -98,19 +99,37 @@ private QueueMetadata(String name, String queueUrl, Map<String, String> attribut
new DaemonThreadFactory("AmazonSQSIdleQueueDeletingClient"));

private final String queueNamePrefix;
private final long heartbeatIntervalSeconds;

private final Map<String, QueueMetadata> queues = new ConcurrentHashMap<>();

private IdleQueueSweeper idleQueueSweeper;
private String deadLetterQueueUrl;

public AmazonSQSIdleQueueDeletingClient(AmazonSQS sqs, String queueNamePrefix) {
public AmazonSQSIdleQueueDeletingClient(AmazonSQS sqs, String queueNamePrefix, Long heartbeatIntervalSeconds) {
super(sqs);

if (queueNamePrefix.isEmpty()) {
throw new IllegalArgumentException("Queue name prefix must be non-empty");
}

this.queueNamePrefix = queueNamePrefix;

if (heartbeatIntervalSeconds != null) {
if (heartbeatIntervalSeconds < HEARTBEAT_INTERVAL_SECONDS_MIN_VALUE) {
throw new IllegalArgumentException("Heartbeat Interval Seconds: " +
heartbeatIntervalSeconds +
" must be equal to or bigger than " +
HEARTBEAT_INTERVAL_SECONDS_MIN_VALUE);
}
this.heartbeatIntervalSeconds = heartbeatIntervalSeconds;
} else {
this.heartbeatIntervalSeconds = HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
}
}

public AmazonSQSIdleQueueDeletingClient(AmazonSQS sqs, String queueNamePrefix) {
this(sqs, queueNamePrefix, null);
}

protected synchronized void startSweeper(AmazonSQSRequester requester, AmazonSQSResponder responder,
Expand Down Expand Up @@ -193,7 +212,7 @@ public CreateQueueResult createQueue(CreateQueueRequest request) {
queues.put(queueUrl, metadata);

metadata.heartbeater = executor.scheduleAtFixedRate(() -> heartbeatToQueue(queueUrl),
0, HEARTBEAT_INTERVAL_SECONDS, TimeUnit.SECONDS);
0, heartbeatIntervalSeconds, TimeUnit.SECONDS);

return result;
}
Expand Down Expand Up @@ -278,7 +297,7 @@ private void heartbeatToQueueIfNecessary(String queueUrl) {
QueueMetadata queueMetadata = queues.get(queueUrl);
if (queueMetadata != null) {
Long lastHeartbeat = queueMetadata.heartbeatTimestamp;
if (lastHeartbeat == null || (System.currentTimeMillis() - lastHeartbeat) > 2 * HEARTBEAT_INTERVAL_SECONDS) {
if (lastHeartbeat == null || (System.currentTimeMillis() - lastHeartbeat) > 2 * heartbeatIntervalSeconds) {
return;
}
heartbeatToQueue(queueUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
import java.util.concurrent.TimeUnit;

public class AmazonSQSRequesterClientBuilder {

private Optional<AmazonSQS> customSQS = Optional.empty();

private String internalQueuePrefix = "__RequesterClientQueues__";

private Map<String, String> queueAttributes = Collections.emptyMap();

private int idleQueueSweepingPeriod = 5;
private long queueHeartbeatInterval = AmazonSQSIdleQueueDeletingClient.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
private TimeUnit idleQueueSweepingTimeUnit = TimeUnit.MINUTES;

private AmazonSQSRequesterClientBuilder() {
Expand All @@ -26,19 +27,19 @@ private AmazonSQSRequesterClientBuilder() {
public static AmazonSQSRequesterClientBuilder standard() {
return new AmazonSQSRequesterClientBuilder();
}

public static AmazonSQSRequester defaultClient() {
return standard().build();
}

public Optional<AmazonSQS> getAmazonSQS() {
return customSQS;
}

public void setAmazonSQS(AmazonSQS sqs) {
this.customSQS = Optional.of(sqs);
}

public AmazonSQSRequesterClientBuilder withAmazonSQS(AmazonSQS sqs) {
setAmazonSQS(sqs);
return this;
Expand All @@ -47,24 +48,24 @@ public AmazonSQSRequesterClientBuilder withAmazonSQS(AmazonSQS sqs) {
public String getInternalQueuePrefix() {
return internalQueuePrefix;
}

public void setInternalQueuePrefix(String internalQueuePrefix) {
this.internalQueuePrefix = internalQueuePrefix;
}

public AmazonSQSRequesterClientBuilder withInternalQueuePrefix(String internalQueuePrefix) {
setInternalQueuePrefix(internalQueuePrefix);
return this;
}

public Map<String, String> getQueueAttributes() {
return Collections.unmodifiableMap(queueAttributes);
}

public void setQueueAttributes(Map<String, String> queueAttributes) {
this.queueAttributes = new HashMap<>(queueAttributes);
}

public AmazonSQSRequesterClientBuilder withQueueAttributes(Map<String, String> queueAttributes) {
setQueueAttributes(queueAttributes);
return this;
Expand All @@ -77,17 +78,30 @@ public int getIdleQueueSweepingPeriod() {
public TimeUnit getIdleQueueSweepingTimeUnit() {
return idleQueueSweepingTimeUnit;
}

public void setIdleQueueSweepingPeriod(int period, TimeUnit timeUnit) {
this.idleQueueSweepingPeriod = period;
this.idleQueueSweepingTimeUnit = timeUnit;
}

public AmazonSQSRequesterClientBuilder withIdleQueueSweepingPeriod(int period, TimeUnit timeUnit) {
setIdleQueueSweepingPeriod(period, timeUnit);
return this;
}


public long getQueueHeartbeatInterval() {
return queueHeartbeatInterval;
}

public void setQueueHeartbeatInterval(long heartbeatIntervalSeconds) {
this.queueHeartbeatInterval = heartbeatIntervalSeconds;
}

public AmazonSQSRequesterClientBuilder withQueueHeartbeatInterval(long heartbeatIntervalSeconds) {
setQueueHeartbeatInterval(heartbeatIntervalSeconds);
return this;
}

public AmazonSQSRequester build() {
return AmazonSQSTemporaryQueuesClient.make(this).getRequester();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class AmazonSQSResponderClientBuilder {
private Optional<AmazonSQS> customSQS = Optional.empty();

private String internalQueuePrefix = "__RequesterClientQueues__";
private long queueHeartbeatInterval = AmazonSQSIdleQueueDeletingClient.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;

private AmazonSQSResponderClientBuilder() {
}
Expand Down Expand Up @@ -40,8 +41,21 @@ public AmazonSQSResponderClientBuilder withInternalQueuePrefix(String internalQu
setInternalQueuePrefix(internalQueuePrefix);
return this;
}

/**

public long getQueueHeartbeatInterval() {
return queueHeartbeatInterval;
}

public void setQueueHeartbeatInterval(long queueHeartbeatInterval) {
this.queueHeartbeatInterval = queueHeartbeatInterval;
}

public AmazonSQSResponderClientBuilder withQueueHeartbeatInterval(long heartbeatIntervalSeconds) {
setQueueHeartbeatInterval(heartbeatIntervalSeconds);
return this;
}

/**
* @return Create new instance of builder with all defaults set.
*/
public static AmazonSQSResponderClientBuilder standard() {
Expand All @@ -50,7 +64,7 @@ public static AmazonSQSResponderClientBuilder standard() {

public AmazonSQSResponder build() {
AmazonSQS sqs = customSQS.orElseGet(AmazonSQSClientBuilder::defaultClient);
AmazonSQS deleter = new AmazonSQSIdleQueueDeletingClient(sqs, internalQueuePrefix);
AmazonSQS deleter = new AmazonSQSIdleQueueDeletingClient(sqs, internalQueuePrefix, queueHeartbeatInterval);
AmazonSQS virtualQueuesClient = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(deleter).build();
return new AmazonSQSResponderClient(virtualQueuesClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private AmazonSQSTemporaryQueuesClient(AmazonSQS virtualizer, AmazonSQSIdleQueue

public static AmazonSQSTemporaryQueuesClient make(AmazonSQSRequesterClientBuilder builder) {
AmazonSQS sqs = builder.getAmazonSQS().orElseGet(AmazonSQSClientBuilder::defaultClient);
AmazonSQSIdleQueueDeletingClient deleter = new AmazonSQSIdleQueueDeletingClient(sqs, builder.getInternalQueuePrefix());
AmazonSQSIdleQueueDeletingClient deleter = new AmazonSQSIdleQueueDeletingClient(sqs, builder.getInternalQueuePrefix(), builder.getQueueHeartbeatInterval());
AmazonSQS virtualizer = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(deleter).build();
AmazonSQSTemporaryQueuesClient temporaryQueuesClient = new AmazonSQSTemporaryQueuesClient(virtualizer, deleter, builder.getInternalQueuePrefix());
AmazonSQSRequesterClient requester = new AmazonSQSRequesterClient(temporaryQueuesClient, builder.getInternalQueuePrefix(), builder.getQueueAttributes());
Expand Down

0 comments on commit e6ffd9a

Please sign in to comment.