Skip to content

Commit

Permalink
[hotfix] Migrate failure handling tests to JUnit5
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuzhurk committed Jul 11, 2022
1 parent 387b2a4 commit 777d780
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,19 @@
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link ExecutionFailureHandler}. */
public class ExecutionFailureHandlerTest extends TestLogger {
class ExecutionFailureHandlerTest {

private static final long RESTART_DELAY_MS = 1234L;

Expand All @@ -56,8 +49,8 @@ public class ExecutionFailureHandlerTest extends TestLogger {

private ExecutionFailureHandler executionFailureHandler;

@Before
public void setUp() {
@BeforeEach
void setUp() {
TestingSchedulingTopology topology = new TestingSchedulingTopology();
topology.newExecutionVertex();
schedulingTopology = topology;
Expand All @@ -71,7 +64,7 @@ public void setUp() {

/** Tests the case that task restarting is accepted. */
@Test
public void testNormalFailureHandling() {
void testNormalFailureHandling() {
final Set<ExecutionVertexID> tasksToRestart =
Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
failoverStrategy.setTasksToRestart(tasksToRestart);
Expand All @@ -84,17 +77,17 @@ public void testNormalFailureHandling() {
new ExecutionVertexID(new JobVertexID(), 0), cause, timestamp);

// verify results
assertTrue(result.canRestart());
assertEquals(RESTART_DELAY_MS, result.getRestartDelayMS());
assertEquals(tasksToRestart, result.getVerticesToRestart());
assertThat(result.getError(), is(cause));
assertThat(result.getTimestamp(), is(timestamp));
assertEquals(1, executionFailureHandler.getNumberOfRestarts());
assertThat(result.canRestart()).isTrue();
assertThat(result.getRestartDelayMS()).isEqualTo(RESTART_DELAY_MS);
assertThat(result.getVerticesToRestart()).isEqualTo(tasksToRestart);
assertThat(result.getError()).isSameAs(cause);
assertThat(result.getTimestamp()).isEqualTo(timestamp);
assertThat(executionFailureHandler.getNumberOfRestarts()).isEqualTo(1);
}

/** Tests the case that task restarting is suppressed. */
@Test
public void testRestartingSuppressedFailureHandlingResult() {
void testRestartingSuppressedFailureHandlingResult() {
// restart strategy suppresses restarting
backoffTimeStrategy.setCanRestart(false);

Expand All @@ -106,28 +99,25 @@ public void testRestartingSuppressedFailureHandlingResult() {
new ExecutionVertexID(new JobVertexID(), 0), error, timestamp);

// verify results
assertFalse(result.canRestart());
assertThat(result.getError(), containsCause(error));
assertThat(result.getTimestamp(), is(timestamp));
assertFalse(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
try {
result.getVerticesToRestart();
fail("get tasks to restart is not allowed when restarting is suppressed");
} catch (IllegalStateException ex) {
// expected
}
try {
result.getRestartDelayMS();
fail("get restart delay is not allowed when restarting is suppressed");
} catch (IllegalStateException ex) {
// expected
}
assertEquals(0, executionFailureHandler.getNumberOfRestarts());
assertThat(result.canRestart()).isFalse();
assertThat(result.getError()).hasCause(error);
assertThat(result.getTimestamp()).isEqualTo(timestamp);
assertThat(ExecutionFailureHandler.isUnrecoverableError(result.getError())).isFalse();

assertThatThrownBy(result::getVerticesToRestart)
.as("getVerticesToRestart is not allowed when restarting is suppressed")
.isInstanceOf(IllegalStateException.class);

assertThatThrownBy(result::getRestartDelayMS)
.as("getRestartDelayMS is not allowed when restarting is suppressed")
.isInstanceOf(IllegalStateException.class);

assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
}

/** Tests the case that the failure is non-recoverable type. */
@Test
public void testNonRecoverableFailureHandlingResult() {
void testNonRecoverableFailureHandlingResult() {
// trigger an unrecoverable task failure
final Throwable error =
new Exception(new SuppressRestartsException(new Exception("test failure")));
Expand All @@ -137,56 +127,55 @@ public void testNonRecoverableFailureHandlingResult() {
new ExecutionVertexID(new JobVertexID(), 0), error, timestamp);

// verify results
assertFalse(result.canRestart());
assertNotNull(result.getError());
assertTrue(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
assertThat(result.getTimestamp(), is(timestamp));
try {
result.getVerticesToRestart();
fail("get tasks to restart is not allowed when restarting is suppressed");
} catch (IllegalStateException ex) {
// expected
}
try {
result.getRestartDelayMS();
fail("get restart delay is not allowed when restarting is suppressed");
} catch (IllegalStateException ex) {
// expected
}
assertEquals(0, executionFailureHandler.getNumberOfRestarts());
assertThat(result.canRestart()).isFalse();
assertThat(result.getError()).isNotNull();
assertThat(ExecutionFailureHandler.isUnrecoverableError(result.getError())).isTrue();
assertThat(result.getTimestamp()).isEqualTo(timestamp);

assertThatThrownBy(result::getVerticesToRestart)
.as("getVerticesToRestart is not allowed when restarting is suppressed")
.isInstanceOf(IllegalStateException.class);

assertThatThrownBy(result::getRestartDelayMS)
.as("getRestartDelayMS is not allowed when restarting is suppressed")
.isInstanceOf(IllegalStateException.class);

assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
}

/** Tests the check for unrecoverable error. */
@Test
public void testUnrecoverableErrorCheck() {
void testUnrecoverableErrorCheck() {
// normal error
assertFalse(ExecutionFailureHandler.isUnrecoverableError(new Exception()));
assertThat(ExecutionFailureHandler.isUnrecoverableError(new Exception())).isFalse();

// direct unrecoverable error
assertTrue(
ExecutionFailureHandler.isUnrecoverableError(
new SuppressRestartsException(new Exception())));
assertThat(
ExecutionFailureHandler.isUnrecoverableError(
new SuppressRestartsException(new Exception())))
.isTrue();

// nested unrecoverable error
assertTrue(
ExecutionFailureHandler.isUnrecoverableError(
new Exception(new SuppressRestartsException(new Exception()))));
assertThat(
ExecutionFailureHandler.isUnrecoverableError(
new Exception(new SuppressRestartsException(new Exception()))))
.isTrue();
}

@Test
public void testGlobalFailureHandling() {
void testGlobalFailureHandling() {
final Throwable error = new Exception("Expected test failure");
final long timestamp = System.currentTimeMillis();
final FailureHandlingResult result =
executionFailureHandler.getGlobalFailureHandlingResult(error, timestamp);

assertEquals(
IterableUtils.toStream(schedulingTopology.getVertices())
.map(SchedulingExecutionVertex::getId)
.collect(Collectors.toSet()),
result.getVerticesToRestart());
assertThat(result.getError(), is(error));
assertThat(result.getTimestamp(), is(timestamp));
assertThat(result.getVerticesToRestart())
.isEqualTo(
IterableUtils.toStream(schedulingTopology.getVertices())
.map(SchedulingExecutionVertex::getId)
.collect(Collectors.toSet()));
assertThat(result.getError()).isSameAs(error);
assertThat(result.getTimestamp()).isEqualTo(timestamp);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,21 @@

import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.HashSet;
import java.util.Set;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.IsSame.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link FailureHandlingResult}. */
public class FailureHandlingResultTest extends TestLogger {
class FailureHandlingResultTest {

/** Tests normal FailureHandlingResult. */
@Test
public void testNormalFailureHandlingResult() {
void testNormalFailureHandlingResult() {
// create a normal FailureHandlingResult
ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
Set<ExecutionVertexID> tasks = new HashSet<>();
Expand All @@ -52,39 +46,35 @@ public void testNormalFailureHandlingResult() {
FailureHandlingResult.restartable(
executionVertexID, error, timestamp, tasks, delay, false);

assertTrue(result.canRestart());
assertEquals(delay, result.getRestartDelayMS());
assertEquals(tasks, result.getVerticesToRestart());
assertThat(result.getError(), sameInstance(error));
assertThat(result.getTimestamp(), is(timestamp));
assertTrue(result.getExecutionVertexIdOfFailedTask().isPresent());
assertThat(result.getExecutionVertexIdOfFailedTask().get(), is(executionVertexID));
assertThat(result.canRestart()).isTrue();
assertThat(delay).isEqualTo(result.getRestartDelayMS());
assertThat(tasks).isEqualTo(result.getVerticesToRestart());
assertThat(result.getError()).isSameAs(error);
assertThat(result.getTimestamp()).isEqualTo(timestamp);
assertThat(result.getExecutionVertexIdOfFailedTask().isPresent()).isTrue();
assertThat(result.getExecutionVertexIdOfFailedTask().get()).isEqualTo(executionVertexID);
}

/** Tests FailureHandlingResult which suppresses restarts. */
@Test
public void testRestartingSuppressedFailureHandlingResultWithNoCausingExecutionVertexId() {
void testRestartingSuppressedFailureHandlingResultWithNoCausingExecutionVertexId() {
// create a FailureHandlingResult with error
Throwable error = new Exception("test error");
long timestamp = System.currentTimeMillis();
FailureHandlingResult result =
FailureHandlingResult.unrecoverable(null, error, timestamp, false);

assertFalse(result.canRestart());
assertThat(result.getError(), sameInstance(error));
assertThat(result.getTimestamp(), is(timestamp));
assertFalse(result.getExecutionVertexIdOfFailedTask().isPresent());
try {
result.getVerticesToRestart();
fail("get tasks to restart is not allowed when restarting is suppressed");
} catch (IllegalStateException ex) {
// expected
}
try {
result.getRestartDelayMS();
fail("get restart delay is not allowed when restarting is suppressed");
} catch (IllegalStateException ex) {
// expected
}
assertThat(result.canRestart()).isFalse();
assertThat(result.getError()).isSameAs(error);
assertThat(result.getTimestamp()).isEqualTo(timestamp);
assertThat(result.getExecutionVertexIdOfFailedTask().isPresent()).isFalse();

assertThatThrownBy(result::getVerticesToRestart)
.as("getVerticesToRestart is not allowed when restarting is suppressed")
.isInstanceOf(IllegalStateException.class);

assertThatThrownBy(result::getRestartDelayMS)
.as("getRestartDelayMS is not allowed when restarting is suppressed")
.isInstanceOf(IllegalStateException.class);
}
}
Loading

0 comments on commit 777d780

Please sign in to comment.