Skip to content

Commit

Permalink
[FLINK-10530][tests] Harden ProcessFailureCancelingITCase and Abstrac…
Browse files Browse the repository at this point in the history
…tTaskManagerProcessFailureRecovery

The problem is that the Dispatcher actor is being started before it gains leadership. When using the
standalone high availability services, then we don't wait until the Dispatcher has confirmed its
leader session id. We only wait until the actor has become available. Due to that it can happen that
we try to send a RPC message to the Dispatcher before it has actually set its leader session id.

This commit changes the above mentioned tests to use HA mode based on ZooKeeper. With that, we will
wait until the leader session id has been confirmed.

This closes apache#6827.
  • Loading branch information
tillrohrmann committed Oct 15, 2018
1 parent 9770e78 commit bbee77a
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@

package org.apache.flink.test.recovery;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.util.NetUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;

import org.junit.Rule;
Expand All @@ -42,6 +43,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
Expand Down Expand Up @@ -76,6 +79,9 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
@Rule
public final BlobServerResource blobServerResource = new BlobServerResource();

@Rule
public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();

@Test
public void testTaskManagerProcessFailure() throws Exception {

Expand All @@ -89,18 +95,19 @@ public void testTaskManagerProcessFailure() throws Exception {

File coordinateTempDir = null;

final int jobManagerPort = NetUtils.getAvailablePort();
final int restPort = NetUtils.getAvailablePort();

Configuration jmConfig = new Configuration();
jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
jmConfig.setInteger(RestOptions.PORT, restPort);

try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig)) {
Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);

try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config)) {
// check that we run this test only if the java command
// is available on this machine
String javaCommand = getJavaCommandPath();
Expand All @@ -119,21 +126,28 @@ public void testTaskManagerProcessFailure() throws Exception {

clusterEntrypoint.startCluster();

final Map<String, String> keyValues = config.toMap();
final ArrayList<String> commands = new ArrayList<>((keyValues.size() << 1) + 8);

// the TaskManager java command
String[] command = new String[] {
javaCommand,
"-Dlog.level=DEBUG",
"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
"-Xms80m", "-Xmx80m",
"-classpath", getCurrentClasspath(),
TaskExecutorProcessEntryPoint.class.getName(),
String.valueOf(jobManagerPort)
};
commands.add(javaCommand);
commands.add("-Dlog.level=DEBUG");
commands.add("-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath());
commands.add("-Xms80m");
commands.add("-Xmx80m");
commands.add("-classpath");
commands.add(getCurrentClasspath());
commands.add(AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName());

for (Map.Entry<String, String> keyValue: keyValues.entrySet()) {
commands.add("--" + keyValue.getKey());
commands.add(keyValue.getValue());
}

// start the first two TaskManager processes
taskManagerProcess1 = new ProcessBuilder(command).start();
taskManagerProcess1 = new ProcessBuilder(commands).start();
new CommonTestUtils.PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
taskManagerProcess2 = new ProcessBuilder(command).start();
taskManagerProcess2 = new ProcessBuilder(commands).start();
new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);

// the program will set a marker file in each of its parallel tasks once they are ready, so that
Expand All @@ -148,7 +162,7 @@ public void testTaskManagerProcessFailure() throws Exception {
@Override
public void run() {
try {
testTaskManagerFailure(restPort, coordinateDirClosure);
testTaskManagerFailure(config, coordinateDirClosure);
}
catch (Throwable t) {
t.printStackTrace();
Expand Down Expand Up @@ -176,7 +190,7 @@ public void run() {
}

// start the third TaskManager
taskManagerProcess3 = new ProcessBuilder(command).start();
taskManagerProcess3 = new ProcessBuilder(commands).start();
new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);

// kill one of the previous TaskManagers, triggering a failure and recovery
Expand Down Expand Up @@ -232,11 +246,11 @@ public void run() {
* The test program should be implemented here in a form of a separate thread.
* This provides a solution for checking that it has been terminated.
*
* @param jobManagerPort The port for submitting the topology to the local cluster
* @param configuration the config to use
* @param coordinateDir TaskManager failure will be triggered only after processes
* have successfully created file under this directory
*/
public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception;
public abstract void testTaskManagerFailure(Configuration configuration, File coordinateDir) throws Exception;

protected static void printProcessLog(String processName, String log) {
if (log == null || log.length() == 0) {
Expand Down Expand Up @@ -306,15 +320,8 @@ public static class TaskExecutorProcessEntryPoint {

public static void main(String[] args) {
try {
int jobManagerPort = Integer.parseInt(args[0]);

Configuration cfg = new Configuration();
cfg.setString(JobManagerOptions.ADDRESS, "localhost");
cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
Configuration cfg = parameterTool.getConfiguration();

TaskManagerRunner.runTaskManager(cfg, ResourceID.generate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
Expand All @@ -53,16 +55,20 @@
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.CheckedSupplier;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.StringWriter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand All @@ -84,6 +90,12 @@ public class ProcessFailureCancelingITCase extends TestLogger {
@Rule
public final BlobServerResource blobServerResource = new BlobServerResource();

@Rule
public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();

@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();

@Test
public void testCancelingOnProcessFailure() throws Exception {
final StringWriter processOutput = new StringWriter();
Expand All @@ -93,23 +105,30 @@ public void testCancelingOnProcessFailure() throws Exception {
Process taskManagerProcess = null;
final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();

Configuration jmConfig = new Configuration();
jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
jmConfig.setInteger(RestOptions.PORT, 0);

final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig);
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);

final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
final int jobManagerPort = rpcService.getPort();
jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
config.setInteger(JobManagerOptions.PORT, jobManagerPort);

final SessionDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = new SessionDispatcherResourceManagerComponentFactory(
StandaloneResourceManagerFactory.INSTANCE);
DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent = null;

try (final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
jmConfig,
TestingUtils.defaultExecutor(),
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) {
final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
TestingUtils.defaultExecutor(),
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);

try {

// check that we run this test only if the java command
// is available on this machine
Expand All @@ -125,7 +144,7 @@ public void testCancelingOnProcessFailure() throws Exception {
CommonTestUtils.printLog4jDebugConfig(tempLogFile);

dispatcherResourceManagerComponent = resourceManagerComponentFactory.create(
jmConfig,
config,
rpcService,
haServices,
blobServerResource.getBlobServer(),
Expand All @@ -134,26 +153,26 @@ public void testCancelingOnProcessFailure() throws Exception {
new MemoryArchivedExecutionGraphStore(),
fatalErrorHandler);

// update the rest ports
final int restPort = dispatcherResourceManagerComponent
.getWebMonitorEndpoint()
.getServerAddress()
.getPort();
jmConfig.setInteger(RestOptions.PORT, restPort);
final Map<String, String> keyValues = config.toMap();
final ArrayList<String> commands = new ArrayList<>((keyValues.size() << 1) + 8);

// the TaskManager java command
String[] command = new String[] {
javaCommand,
"-Dlog.level=DEBUG",
"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
"-Xms80m", "-Xmx80m",
"-classpath", getCurrentClasspath(),
AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(),
String.valueOf(jobManagerPort)
};
commands.add(javaCommand);
commands.add("-Dlog.level=DEBUG");
commands.add("-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath());
commands.add("-Xms80m");
commands.add("-Xmx80m");
commands.add("-classpath");
commands.add(getCurrentClasspath());
commands.add(AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName());

for (Map.Entry<String, String> keyValue: keyValues.entrySet()) {
commands.add("--" + keyValue.getKey());
commands.add(keyValue.getValue());
}

// start the first two TaskManager processes
taskManagerProcess = new ProcessBuilder(command).start();
taskManagerProcess = new ProcessBuilder(commands).start();
new CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);

final Throwable[] errorRef = new Throwable[1];
Expand All @@ -163,7 +182,7 @@ public void testCancelingOnProcessFailure() throws Exception {
@Override
public void run() {
try {
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new Configuration());
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", 1337, config);
env.setParallelism(2);
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
Expand Down Expand Up @@ -196,16 +215,10 @@ public Long map(Long value) throws Exception {
// kill the TaskManager
programThread.start();

final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), Time.seconds(10L));

final DispatcherGateway dispatcherGateway = rpcService.connect(
leaderConnectionInfo.getAddress(),
DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
DispatcherGateway.class).get();

final DispatcherGateway dispatcherGateway = retrieveDispatcherGateway(rpcService, haServices);
waitUntilAllSlotsAreUsed(dispatcherGateway, timeout);

clusterClient = new RestClusterClient<>(jmConfig, "standalone");
clusterClient = new RestClusterClient<>(config, "standalone");

final Collection<JobID> jobIds = waitForRunningJobs(clusterClient, timeout);

Expand Down Expand Up @@ -252,12 +265,31 @@ public Long map(Long value) throws Exception {
dispatcherResourceManagerComponent.close();
}

haServices.closeAndCleanupAllData();

fatalErrorHandler.rethrowError();

RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
}
}

/**
* Helper method to wait until the {@link Dispatcher} has set its fencing token.
*
* @param rpcService to use to connect to the dispatcher
* @param haServices high availability services to connect to the dispatcher
* @return {@link DispatcherGateway}
* @throws Exception if something goes wrong
*/
static DispatcherGateway retrieveDispatcherGateway(RpcService rpcService, HighAvailabilityServices haServices) throws Exception {
final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), Time.seconds(10L));

return rpcService.connect(
leaderConnectionInfo.getAddress(),
DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
DispatcherGateway.class).get();
}

private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException {
FutureUtils.retrySuccesfulWithDelay(
() -> dispatcherGateway.requestClusterOverview(timeout),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ public static Collection<Object[]> executionMode() {
// --------------------------------------------------------------------------------------------

@Override
public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception {
public void testTaskManagerFailure(Configuration configuration, final File coordinateDir) throws Exception {

final Configuration configuration = new Configuration();
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", 1337, configuration);
env.setParallelism(PARALLELISM);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
env.getConfig().setExecutionMode(executionMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,13 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
private static final int DATA_COUNT = 10000;

@Override
public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception {
public void testTaskManagerFailure(Configuration configuration, final File coordinateDir) throws Exception {

final File tempCheckpointDir = tempFolder.newFolder();

final Configuration configuration = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost",
jobManagerPort,
1337, // not needed since we use ZooKeeper
configuration);
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
Expand Down

0 comments on commit bbee77a

Please sign in to comment.