Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-aws committed Jun 25, 2020
2 parents 156f531 + 536db00 commit 9c1128c
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 86 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target/
.idea
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
language: java
jdk:
- openjdk8
18 changes: 18 additions & 0 deletions buildspec.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: 0.2

phases:
install:
runtime-versions:
java: corretto8
pre_build:
commands:
- echo Nothing to do in the pre_build phase...
build:
commands:
- echo Build started on `date`
- mvn clean install
- echo Running integration tests
- mvn test-compile failsafe:integration-test failsafe:verify
post_build:
commands:
- echo Build completed on `date`
81 changes: 53 additions & 28 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@
<version>2.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -136,34 +147,48 @@
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
<configuration>
<keyname>${gpg.sqs.keyname}</keyname>
<passphraseServerId>gpg.sqs.passphrase</passphraseServerId>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>

<profiles>
<profile>
<id>publishing</id>

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
<configuration>
<gpgArguments>
<gpgArgument>--pinentry-mode</gpgArgument>
<gpgArgument>loopback</gpgArgument>
</gpgArguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) {
try {
try {
return receiveBuffer.receiveMessageAsync(request)
.get(request.getWaitTimeSeconds(), TimeUnit.SECONDS);
.get(Optional.ofNullable(request.getWaitTimeSeconds()).orElse(0).longValue(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Fall through to an empty receive
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ public ScheduledSQSFutureTask(Callable<T> callable, MessageContent messageConten
this.period = unit.toNanos(period);

messageContent.setMessageAttributesEntry(DELAY_NANOS_ATTRIBUTE_NAME,
longMessageAttributeValue(delay));
longMessageAttributeValue(this.delay));
messageContent.setMessageAttributesEntry(PERIOD_NANOS_ATTRIBUTE_NAME,
longMessageAttributeValue(period));
longMessageAttributeValue(this.period));

this.time = getTime(delay);
this.time = getTime(this.delay);
}

public ScheduledSQSFutureTask(Message message) {
Expand Down Expand Up @@ -122,10 +122,7 @@ public int compareTo(Delayed other) {
public SendMessageRequest toSendMessageRequest() {
SendMessageRequest request = super.toSendMessageRequest();

int sqsDelaySeconds = (int)Math.min(TimeUnit.NANOSECONDS.toSeconds(delay), MAX_SQS_DELAY_SECONDS);
if (sqsDelaySeconds < 0) {
sqsDelaySeconds = 1;
}
int sqsDelaySeconds = Math.max(1, (int)Math.min(TimeUnit.NANOSECONDS.toSeconds(delay), MAX_SQS_DELAY_SECONDS));
request.setDelaySeconds(sqsDelaySeconds);

return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ protected String testSuiteName() {

@Before
public void setup() {
String policyString = allowSendMessagePolicy().toJson();
// Use the secondary role for the responder
sqsResponder = new AmazonSQSResponderClient(getBuddyPrincipalClient());

String policyString = allowSendMessagePolicy(getBuddyRoleARN()).toJson();
sqsRequester = new AmazonSQSRequesterClient(sqs, queueNamePrefix,
Collections.singletonMap(QueueAttributeName.Policy.toString(), policyString),
exceptionHandler);
// Use the second account for the responder
sqsResponder = new AmazonSQSResponderClient(getBuddyPrincipalClient());

requestQueueUrl = sqs.createQueue("RequestQueue-" + UUID.randomUUID().toString()).getQueueUrl();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void accessDenied() {

@Test
public void withAccess() {
String policyString = allowSendMessagePolicy().toJson();
String policyString = allowSendMessagePolicy(getBuddyRoleARN()).toJson();
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.withQueueName(queueNamePrefix + "TestQueueWithAccess")
.withAttributes(Collections.singletonMap(QueueAttributeName.Policy.toString(), policyString));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,22 @@ public void expiringVirtualQueue() throws InterruptedException {
// Expected
}
}

@Test
public void ReceiveMessageWaitTimeSecondsNull() {
CreateQueueRequest request = new CreateQueueRequest()
.withQueueName("ReceiveMessageWaitTimeSecondsNull")
.addAttributesEntry(AmazonSQSVirtualQueuesClient.VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE, hostQueueUrl)
.addAttributesEntry(AmazonSQSIdleQueueDeletingClient.IDLE_QUEUE_RETENTION_PERIOD, "5");
String virtualQueueUrl = client.createQueue(request).getQueueUrl();

// Do Receive message request with null WaitTimeSeconds.
ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest()
.withQueueUrl(virtualQueueUrl);
try {
assertEquals(0, client.receiveMessage(receiveRequest).getMessages().size());
} catch (NullPointerException npe) {
fail("NPE not expected with null WaitTimeSeconds on ReceiveMessageRequest");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class IdleQueueSweeperIT extends IntegrationTest {

@Before
public void setup() {
requester = AmazonSQSRequesterClientBuilder.standard().withAmazonSQS(sqs).build();
requester = AmazonSQSRequesterClientBuilder.standard().withAmazonSQS(sqs).withIdleQueueSweepingPeriod(0, TimeUnit.SECONDS).build();
responder = AmazonSQSResponderClientBuilder.standard().withAmazonSQS(sqs).build();
sweepingQueueUrl = sqs.createQueue(queueNamePrefix).getQueueUrl();
sweeper = new IdleQueueSweeper(requester, responder, sweepingQueueUrl, queueNamePrefix, 5, TimeUnit.SECONDS, exceptionHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static com.amazonaws.services.sqs.executors.DeduplicatedRunnable.deduplicated;
import static com.amazonaws.services.sqs.executors.SerializableRunnable.serializable;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -12,7 +14,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
Expand All @@ -39,7 +40,7 @@ public class SQSScheduledExecutorServiceIT extends IntegrationTest {
private static String queueUrl;
private static List<SQSExecutorService> executors = new ArrayList<>();
private static AtomicInteger seedCount = new AtomicInteger();
private static CountDownLatch tasksCompletedLatch;
private static AtomicInteger tasksRemaining;

private static class SQSScheduledExecutorWithAssertions extends SQSScheduledExecutorService implements Serializable {

Expand All @@ -66,10 +67,16 @@ public void setup() {
Collections.emptyMap(), exceptionHandler);
responder = new AmazonSQSResponderClient(sqs);
queueUrl = sqs.createQueue(queueNamePrefix + "-RequestQueue").getQueueUrl();
tasksCompletedLatch = new CountDownLatch(1);
tasksRemaining = new AtomicInteger(1);
executors.clear();
}

private void awaitTasksSeconds(int minimumSeconds, int maximumSeconds) {
await().atLeast(minimumSeconds, TimeUnit.SECONDS).and()
.atMost(maximumSeconds, TimeUnit.SECONDS)
.untilAtomic(tasksRemaining, equalTo(0));
}

@After
public void teardown() {
assertTrue(executors.parallelStream().allMatch(this::shutdownExecutor));
Expand Down Expand Up @@ -110,29 +117,29 @@ private static void sweepParent(Executor executor, int number) {
}

private static void sweepLeaf(Executor executor, int number) {
if (tasksCompletedLatch.getCount() == 0) {
if (tasksRemaining.get() == 0) {
throw new IllegalStateException("Too many leaves swept!");
}
tasksCompletedLatch.countDown();
tasksRemaining.decrementAndGet();
}

@Test
public void singleDelayedTask() throws InterruptedException {
SQSScheduledExecutorService executor = createScheduledExecutor(queueUrl);
executor.delayedExecute(serializable(() -> tasksCompletedLatch.countDown()), 1, TimeUnit.SECONDS);
assertTrue(tasksCompletedLatch.await(5, TimeUnit.SECONDS));
executor.delayedExecute(serializable(() -> tasksRemaining.decrementAndGet()), 1, TimeUnit.SECONDS);
awaitTasksSeconds(1, 5);
}

@Test
public void taskThatSpawnsTasksMultipleExecutors() throws InterruptedException {
tasksCompletedLatch = new CountDownLatch(20);
tasksRemaining = new AtomicInteger(20);
List<SQSScheduledExecutorService> sweepers =
IntStream.range(0, 5)
.mapToObj(x -> createScheduledExecutor(queueUrl))
.collect(Collectors.toList());
sweepers.forEach(executor ->
executor.delayedExecute(deduplicated(() -> seed(executor)), 1, TimeUnit.SECONDS));
assertTrue(tasksCompletedLatch.await(15, TimeUnit.SECONDS));
awaitTasksSeconds(1, 15);
}

private static void slowTask() {
Expand All @@ -141,41 +148,52 @@ private static void slowTask() {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
tasksCompletedLatch.countDown();
tasksRemaining.decrementAndGet();
}

private static void fastTask() {
tasksRemaining.decrementAndGet();
}

@Test
public void scheduleAtFixedRate() throws InterruptedException, ExecutionException {
tasksCompletedLatch = new CountDownLatch(3);
public void scheduleSlowTaskAtFixedRate() throws InterruptedException, ExecutionException {
tasksRemaining = new AtomicInteger(3);
SQSScheduledExecutorService executor = createScheduledExecutor(queueUrl);
Future<?> future = executor.scheduleAtFixedRate(serializable(SQSScheduledExecutorServiceIT::slowTask), 1, 1, TimeUnit.SECONDS);
assertTrue(tasksCompletedLatch.await(15, TimeUnit.SECONDS));
assertFalse(future.isDone());

// Cancel and assert that the future behaves correctly locally...
future.cancel(true);
assertTrue(future.isDone());
assertTrue(future.isCancelled());
// TODO-RS: Switch to JUnit 5
try {
future.get();
fail("Expected CancellationException");
} catch (CancellationException e) {
// Expected
}

// ...and that the message gets purged from the queue
assertTrue(SQSQueueUtils.awaitEmptyQueue(sqs, queueUrl, 10, TimeUnit.SECONDS));
awaitTasksSeconds(3, 15);
assertScheduledTaskCanBeCancelled(future);
}

@Test
public void scheduleWithFixedDelay() throws InterruptedException, ExecutionException {
tasksCompletedLatch = new CountDownLatch(3);
public void scheduleSlowTaskWithFixedDelay() throws InterruptedException, ExecutionException {
tasksRemaining = new AtomicInteger(3);
SQSScheduledExecutorService executor = createScheduledExecutor(queueUrl);
Future<?> future = executor.scheduleAtFixedRate(serializable(SQSScheduledExecutorServiceIT::slowTask), 1, 1, TimeUnit.SECONDS);
assertTrue(tasksCompletedLatch.await(10, TimeUnit.SECONDS));
Future<?> future = executor.scheduleWithFixedDelay(serializable(SQSScheduledExecutorServiceIT::slowTask), 1, 1, TimeUnit.SECONDS);
awaitTasksSeconds(9, 15);
assertScheduledTaskCanBeCancelled(future);
}

@Test
public void scheduleFastTaskAtFixedRate() throws InterruptedException, ExecutionException {
tasksRemaining = new AtomicInteger(3);
SQSScheduledExecutorService executor = createScheduledExecutor(queueUrl);
Future<?> future = executor.scheduleAtFixedRate(serializable(SQSScheduledExecutorServiceIT::fastTask), 1, 1, TimeUnit.SECONDS);
awaitTasksSeconds(3, 15);
assertScheduledTaskCanBeCancelled(future);
}

@Test
public void scheduleFastTaskWithFixedDelay() throws InterruptedException, ExecutionException {
tasksRemaining = new AtomicInteger(3);
SQSScheduledExecutorService executor = createScheduledExecutor(queueUrl);
Future<?> future = executor.scheduleWithFixedDelay(serializable(SQSScheduledExecutorServiceIT::fastTask), 1, 1, TimeUnit.SECONDS);
awaitTasksSeconds(3, 15);
assertScheduledTaskCanBeCancelled(future);
}

public void assertScheduledTaskCanBeCancelled(Future<?> future) throws ExecutionException, InterruptedException {
assertFalse(future.isDone());

// Cancel and assert that the future behaves correctly locally...
future.cancel(true);
assertTrue(future.isDone());
Expand All @@ -187,7 +205,7 @@ public void scheduleWithFixedDelay() throws InterruptedException, ExecutionExcep
} catch (CancellationException e) {
// Expected
}

// ...and that the message gets purged from the queue
assertTrue(SQSQueueUtils.awaitEmptyQueue(sqs, queueUrl, 10, TimeUnit.SECONDS));
}
Expand Down
Loading

0 comments on commit 9c1128c

Please sign in to comment.