Skip to content

Commit

Permalink
Made some changes after PR comments:
Browse files Browse the repository at this point in the history
Removed tabs in 1 file.
Removed idle from queueSweepingPeriod variable
Moved default heartbeat interval to builder as constant.
  • Loading branch information
hcalsos committed May 12, 2020
1 parent d5ef6eb commit 9917193
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ class AmazonSQSIdleQueueDeletingClient extends AbstractAmazonSQSClientWrapper {
public static final long MAXIMUM_IDLE_QUEUE_RETENTION_PERIOD_SECONDS = TimeUnit.MINUTES.toSeconds(5);

static final String IDLE_QUEUE_RETENTION_PERIOD_TAG = "__IdleQueueRetentionPeriodSeconds";
// TODO-RS: Configuration
private final long HEARTBEAT_INTERVAL_SECONDS;

private static final String SWEEPING_QUEUE_DLQ_SUFFIX = "_DLQ";
private static final long DLQ_MESSAGE_RETENTION_PERIOD = TimeUnit.DAYS.toSeconds(14);
Expand All @@ -98,6 +96,7 @@ private QueueMetadata(String name, String queueUrl, Map<String, String> attribut
new DaemonThreadFactory("AmazonSQSIdleQueueDeletingClient"));

private final String queueNamePrefix;
private final long heartbeatInterval;

private final Map<String, QueueMetadata> queues = new ConcurrentHashMap<>();

Expand All @@ -111,8 +110,7 @@ public AmazonSQSIdleQueueDeletingClient(AmazonSQS sqs, String queueNamePrefix, L
throw new IllegalArgumentException("Queue name prefix must be non-empty");
}
this.queueNamePrefix = queueNamePrefix;
this.HEARTBEAT_INTERVAL_SECONDS = heartbeatInterval != null ? heartbeatInterval : 5;

this.heartbeatInterval = heartbeatInterval != null ? heartbeatInterval : AmazonSQSRequesterClientBuilder.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
}

public AmazonSQSIdleQueueDeletingClient(AmazonSQS sqs, String queueNamePrefix) {
Expand Down Expand Up @@ -199,7 +197,7 @@ public CreateQueueResult createQueue(CreateQueueRequest request) {
queues.put(queueUrl, metadata);

metadata.heartbeater = executor.scheduleAtFixedRate(() -> heartbeatToQueue(queueUrl),
0, HEARTBEAT_INTERVAL_SECONDS, TimeUnit.SECONDS);
0, heartbeatInterval, TimeUnit.SECONDS);

return result;
}
Expand Down Expand Up @@ -284,7 +282,7 @@ private void heartbeatToQueueIfNecessary(String queueUrl) {
QueueMetadata queueMetadata = queues.get(queueUrl);
if (queueMetadata != null) {
Long lastHeartbeat = queueMetadata.heartbeatTimestamp;
if (lastHeartbeat == null || (System.currentTimeMillis() - lastHeartbeat) > 2 * HEARTBEAT_INTERVAL_SECONDS) {
if (lastHeartbeat == null || (System.currentTimeMillis() - lastHeartbeat) > 2 * heartbeatInterval) {
return;
}
heartbeatToQueue(queueUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
import java.util.concurrent.TimeUnit;

public class AmazonSQSRequesterClientBuilder {


public static final long HEARTBEAT_INTERVAL_SECONDS_DEFAULT = 5;
private Optional<AmazonSQS> customSQS = Optional.empty();

private String internalQueuePrefix = "__RequesterClientQueues__";

private Map<String, String> queueAttributes = Collections.emptyMap();

private int idleQueueSweepingPeriod = 5;
private long idleQueueHeartbeatInterval = 5;
private long queueHeartbeatInterval = HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
private TimeUnit idleQueueSweepingTimeUnit = TimeUnit.MINUTES;

private AmazonSQSRequesterClientBuilder() {
Expand All @@ -27,19 +28,19 @@ private AmazonSQSRequesterClientBuilder() {
public static AmazonSQSRequesterClientBuilder standard() {
return new AmazonSQSRequesterClientBuilder();
}

public static AmazonSQSRequester defaultClient() {
return standard().build();
}

public Optional<AmazonSQS> getAmazonSQS() {
return customSQS;
}

public void setAmazonSQS(AmazonSQS sqs) {
this.customSQS = Optional.of(sqs);
}

public AmazonSQSRequesterClientBuilder withAmazonSQS(AmazonSQS sqs) {
setAmazonSQS(sqs);
return this;
Expand All @@ -48,24 +49,24 @@ public AmazonSQSRequesterClientBuilder withAmazonSQS(AmazonSQS sqs) {
public String getInternalQueuePrefix() {
return internalQueuePrefix;
}

public void setInternalQueuePrefix(String internalQueuePrefix) {
this.internalQueuePrefix = internalQueuePrefix;
}

public AmazonSQSRequesterClientBuilder withInternalQueuePrefix(String internalQueuePrefix) {
setInternalQueuePrefix(internalQueuePrefix);
return this;
}

public Map<String, String> getQueueAttributes() {
return Collections.unmodifiableMap(queueAttributes);
}

public void setQueueAttributes(Map<String, String> queueAttributes) {
this.queueAttributes = new HashMap<>(queueAttributes);
}

public AmazonSQSRequesterClientBuilder withQueueAttributes(Map<String, String> queueAttributes) {
setQueueAttributes(queueAttributes);
return this;
Expand All @@ -78,29 +79,29 @@ public int getIdleQueueSweepingPeriod() {
public TimeUnit getIdleQueueSweepingTimeUnit() {
return idleQueueSweepingTimeUnit;
}

public void setIdleQueueSweepingPeriod(int period, TimeUnit timeUnit) {
this.idleQueueSweepingPeriod = period;
this.idleQueueSweepingTimeUnit = timeUnit;
}

public AmazonSQSRequesterClientBuilder withIdleQueueSweepingPeriod(int period, TimeUnit timeUnit) {
setIdleQueueSweepingPeriod(period, timeUnit);
return this;
}

public long getIdleQueueHeartbeatInterval() {
return idleQueueHeartbeatInterval;
}
public long getQueueHeartbeatInterval() {
return queueHeartbeatInterval;
}

public void setIdleQueueHeartbeatInterval(long heartbeatInterval) {
this.idleQueueHeartbeatInterval = heartbeatInterval;
public void setQueueHeartbeatInterval(long heartbeatInterval) {
this.queueHeartbeatInterval = heartbeatInterval;
}

public AmazonSQSRequesterClientBuilder withIdleQueueHeartbeatInterval(long heartbeatInterval) {
setIdleQueueHeartbeatInterval(heartbeatInterval);
public AmazonSQSRequesterClientBuilder withQueueHeartbeatInterval(long heartbeatInterval) {
setQueueHeartbeatInterval(heartbeatInterval);
return this;
}
}

public AmazonSQSRequester build() {
return AmazonSQSTemporaryQueuesClient.make(this).getRequester();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class AmazonSQSResponderClientBuilder {
private Optional<AmazonSQS> customSQS = Optional.empty();

private String internalQueuePrefix = "__RequesterClientQueues__";
private long idleQueueHeartbeatInterval = 5;
private long queueHeartbeatInterval = 5;

private AmazonSQSResponderClientBuilder() {
}
Expand Down Expand Up @@ -42,16 +42,16 @@ public AmazonSQSResponderClientBuilder withInternalQueuePrefix(String internalQu
return this;
}

public long getIdleQueueHeartbeatInterval() {
return idleQueueHeartbeatInterval;
public long getQueueHeartbeatInterval() {
return queueHeartbeatInterval;
}

public void setIdleQueueHeartbeatInterval(long idleQueueHeartbeatInterval) {
this.idleQueueHeartbeatInterval = idleQueueHeartbeatInterval;
public void setQueueHeartbeatInterval(long queueHeartbeatInterval) {
this.queueHeartbeatInterval = queueHeartbeatInterval;
}

public AmazonSQSResponderClientBuilder withIdleQueueHeartbeatInterval(long heartbeatInterval) {
setIdleQueueHeartbeatInterval(heartbeatInterval);
public AmazonSQSResponderClientBuilder withQueueHeartbeatInterval(long heartbeatInterval) {
setQueueHeartbeatInterval(heartbeatInterval);
return this;
}

Expand All @@ -64,7 +64,7 @@ public static AmazonSQSResponderClientBuilder standard() {

public AmazonSQSResponder build() {
AmazonSQS sqs = customSQS.orElseGet(AmazonSQSClientBuilder::defaultClient);
AmazonSQS deleter = new AmazonSQSIdleQueueDeletingClient(sqs, internalQueuePrefix, idleQueueHeartbeatInterval);
AmazonSQS deleter = new AmazonSQSIdleQueueDeletingClient(sqs, internalQueuePrefix, queueHeartbeatInterval);
AmazonSQS virtualQueuesClient = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(deleter).build();
return new AmazonSQSResponderClient(virtualQueuesClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private AmazonSQSTemporaryQueuesClient(AmazonSQS virtualizer, AmazonSQSIdleQueue

public static AmazonSQSTemporaryQueuesClient make(AmazonSQSRequesterClientBuilder builder) {
AmazonSQS sqs = builder.getAmazonSQS().orElseGet(AmazonSQSClientBuilder::defaultClient);
AmazonSQSIdleQueueDeletingClient deleter = new AmazonSQSIdleQueueDeletingClient(sqs, builder.getInternalQueuePrefix(), builder.getIdleQueueHeartbeatInterval());
AmazonSQSIdleQueueDeletingClient deleter = new AmazonSQSIdleQueueDeletingClient(sqs, builder.getInternalQueuePrefix(), builder.getQueueHeartbeatInterval());
AmazonSQS virtualizer = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(deleter).build();
AmazonSQSTemporaryQueuesClient temporaryQueuesClient = new AmazonSQSTemporaryQueuesClient(virtualizer, deleter, builder.getInternalQueuePrefix());
AmazonSQSRequesterClient requester = new AmazonSQSRequesterClient(temporaryQueuesClient, builder.getInternalQueuePrefix(), builder.getQueueAttributes());
Expand Down

0 comments on commit 9917193

Please sign in to comment.