Skip to content

Commit

Permalink
SAMZA-2504: Improve Container Placement Flaky Test & Running Time
Browse files Browse the repository at this point in the history
Improvement [Bug fix]:

- Fix a flaky test for Container Placements on Request status
- Improve the running time of Test suite from 40 secs to under 4 seconds

API changes: None

Upgrade Instructions: None

Usage Instructions: None

Author: Sanil15 <[email protected]>

Reviewers: mynameborat <[email protected]>

Closes #1376 from Sanil15/SAMZA-2504

(cherry picked from commit bcee407)
Signed-off-by: mynameborat <[email protected]>
  • Loading branch information
Sanil15 authored and mynameborat committed Jun 8, 2020
1 parent 1a05c48 commit 6fe7efd
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager.container.placement;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.samza.clustermanager.ContainerProcessManager;
import org.apache.samza.config.ApplicationConfig;
Expand Down Expand Up @@ -50,14 +51,33 @@ public class ContainerPlacementRequestAllocator implements Runnable {
* RunId of the app
*/
private final String appRunId;

/**
* Sleep time for container placement handler thread
*/
private final int containerPlacementHandlerSleepMs;
public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore containerPlacementMetadataStore, ContainerProcessManager manager, ApplicationConfig config) {
Preconditions.checkNotNull(containerPlacementMetadataStore, "containerPlacementMetadataStore cannot be null");
Preconditions.checkNotNull(manager, "ContainerProcessManager cannot be null");
this.containerProcessManager = manager;
this.containerPlacementMetadataStore = containerPlacementMetadataStore;
this.isRunning = true;
this.appRunId = config.getRunId();
this.containerPlacementHandlerSleepMs = DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS;
}

@VisibleForTesting
/**
* Should only get used for testing, cannot make it package private because end to end integeration test
* need package private methods which live in org.apache.samza.clustermanager
*/
public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore containerPlacementMetadataStore, ContainerProcessManager manager, ApplicationConfig config, int containerPlacementHandlerSleepMs) {
Preconditions.checkNotNull(containerPlacementMetadataStore, "containerPlacementMetadataStore cannot be null");
Preconditions.checkNotNull(manager, "ContainerProcessManager cannot be null");
this.containerProcessManager = manager;
this.containerPlacementMetadataStore = containerPlacementMetadataStore;
this.isRunning = true;
this.appRunId = config.getRunId();
this.containerPlacementHandlerSleepMs = containerPlacementHandlerSleepMs;
}

@Override
Expand All @@ -75,7 +95,7 @@ public void run() {
containerPlacementMetadataStore.deleteAllContainerPlacementMessages(message.getUuid());
}
}
Thread.sleep(DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS);
Thread.sleep(containerPlacementHandlerSleepMs);
} catch (InterruptedException e) {
LOG.warn("Got InterruptedException in ContainerPlacementRequestAllocator thread.", e);
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@

/**
* Set of Integration tests for container placement actions
*
* Please note that semaphores are used wherever possible, there are some Thread.sleep used for the main thread to check
* on state changes to atomic variables or synchroized metadata objects because of difficulty of plugging semaphores to
* those pieces of logic
*/
@RunWith(MockitoJUnitRunner.class)
public class TestContainerPlacementActions {
Expand Down Expand Up @@ -275,7 +279,7 @@ public Void answer(InvocationOnMock invocation) {
public void testActionQueuingForConsecutivePlacementActions() throws Exception {
// Spawn a Request Allocator Thread
ContainerPlacementRequestAllocator requestAllocator =
new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config));
new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config), 100);
Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread");

requestAllocatorThread.start();
Expand Down Expand Up @@ -345,7 +349,7 @@ public Void answer(InvocationOnMock invocation) {
== ContainerPlacementMessage.StatusCode.SUCCEEDED) {
break;
}
Thread.sleep(Duration.ofSeconds(5).toMillis());
Thread.sleep(100);
}

assertEquals(state.preferredHostRequests.get(), 4);
Expand Down Expand Up @@ -647,8 +651,9 @@ public Void answer(InvocationOnMock invocation) {
fail("timed out waiting for the containers to start");
}

// Wait for both the containers to be in running state
while (state.runningProcessors.size() != 2) {
// Wait for both the containers to be in running state & control action metadata to succeed
while (state.runningProcessors.size() != 2
&& metadata.getActionStatus() != ContainerPlacementMessage.StatusCode.SUCCEEDED) {
Thread.sleep(100);
}

Expand All @@ -660,8 +665,6 @@ public Void answer(InvocationOnMock invocation) {
assertEquals(state.anyHostRequests.get(), 0);
// Failed processors must be empty
assertEquals(state.failedProcessors.size(), 0);
// Control Action should be success in this case
assertEquals(metadata.getActionStatus(), ContainerPlacementMessage.StatusCode.SUCCEEDED);
}

@Test(timeout = 10000)
Expand Down Expand Up @@ -850,8 +853,9 @@ public void testContainerSuccessfulMoveActionWithStandbyEnabled() throws Excepti

// Spawn a Request Allocator Thread
ContainerPlacementRequestAllocator requestAllocator =
new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config));
new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config), 100);
Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread");

requestAllocatorThread.start();

doAnswer(new Answer<Void>() {
Expand Down Expand Up @@ -923,7 +927,7 @@ public Void answer(InvocationOnMock invocation) {
== ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
break;
}
Thread.sleep(Duration.ofSeconds(5).toMillis());
Thread.sleep(100);
}

// App running state should remain the same
Expand Down Expand Up @@ -960,7 +964,7 @@ public Void answer(InvocationOnMock invocation) {
== ContainerPlacementMessage.StatusCode.SUCCEEDED) {
break;
}
Thread.sleep(Duration.ofSeconds(5).toMillis());
Thread.sleep(100);
}

assertEquals(4, state.runningProcessors.size());
Expand Down

0 comments on commit 6fe7efd

Please sign in to comment.