Skip to content

Commit

Permalink
[TaskManager] Add test for failure behavior on TaskManager startup
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Apr 7, 2015
1 parent 0ca1f0c commit 4ed009e
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1368,7 +1368,7 @@ object TaskManager {
val cause = t.getCause()
if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) {
val address = taskManagerHostname + ":" + actorSystemPort
throw new Exception("Unable to bind TaskManager actor system to address " +
throw new IOException("Unable to bind TaskManager actor system to address " +
address + " - " + cause.getMessage(), t)
}
}
Expand Down Expand Up @@ -1532,22 +1532,28 @@ object TaskManager {
}

// now start the memory manager
val memoryManager = new DefaultMemoryManager(memorySize,
taskManagerConfig.numberOfSlots,
netConfig.networkBufferSize)
val memoryManager = try {
new DefaultMemoryManager(memorySize,
taskManagerConfig.numberOfSlots,
netConfig.networkBufferSize)
} catch {
case e: OutOfMemoryError => throw new Exception(
"OutOfMemory error (" + e.getMessage + ") while allocating the TaskManager memory (" +
memorySize + " bytes).", e)
}

// start the I/O manager last, it will create some temp directories.
val ioManager: IOManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths)

// create the actor properties (which define the actor constructor parameters)
val tmProps = Props(taskManagerClass,
taskManagerConfig,
connectionInfo,
jobManagerAkkaUrl,
memoryManager,
ioManager,
network,
taskManagerConfig.numberOfSlots)
taskManagerConfig,
connectionInfo,
jobManagerAkkaUrl,
memoryManager,
ioManager,
network,
taskManagerConfig.numberOfSlots)

taskManagerActorName match {
case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;
Expand All @@ -53,7 +56,7 @@
* when connecting to the JobManager, and when the JobManager
* is unreachable.
*/
public class RegistrationTest {
public class TaskManagerRegistrationTest {

private static final Option<String> NONE_STRING = Option.empty();

Expand All @@ -68,7 +71,7 @@ public static void startActorSystem() {
config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
config.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);

actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
actorSystem = AkkaUtils.createLocalActorSystem(config);
}

@AfterClass
Expand Down Expand Up @@ -330,6 +333,67 @@ protected void run() {
}};
}


@Test
public void testStartupWhenNetworkStackFailsToInitialize() {

ServerSocket blocker = null;
try {
blocker = new ServerSocket(0, 50, InetAddress.getByName("localhost"));

final Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, blocker.getLocalPort());
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);

new JavaTestKit(actorSystem) {{
try {
// a simple JobManager
final ActorRef jobManager = startJobManager();

// start a task manager with a configuration that provides a blocked port
final ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
cfg, actorSystem, "localhost",
NONE_STRING, // no actor name -> random
new Some<String>(jobManager.path().toString()), // job manager path
false, // init network stack !!!
TaskManager.class);

watch(taskManager);

expectTerminated(new FiniteDuration(20, TimeUnit.SECONDS), taskManager);

stopActor(taskManager);
stopActor(jobManager);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}};
}
catch (Exception e) {
// does not work, skip test
e.printStackTrace();
fail(e.getMessage());
}
finally {
if (blocker != null) {
try {
blocker.close();
}
catch (IOException e) {
// ignore, best effort
}
}
}
}

@Test
public void testStartupWhenBlobDirectoriesAreNotWritable() {

}

// --------------------------------------------------------------------------------------------
// Utility Functions
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskmanager;

import static org.junit.Assert.*;

import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.UUID;

/**
* Tests that check how the TaskManager behaves when encountering startup problems.
*/
public class TestManagerStartupTest {

/**
* Tests that the TaskManager fails synchronously when the actor system port
* is in use.
*/
@Test
public void testStartupWhenTaskmanagerActorPortIsUsed() {
ServerSocket blocker = null;
try {
final String localHostName = "localhost";
final InetAddress localAddress = InetAddress.getByName(localHostName);

// block some port
blocker = new ServerSocket(0, 50, localAddress);
final int port = blocker.getLocalPort();

try {
TaskManager.runTaskManager(localHostName, port, new Configuration(), TaskManager.class);
fail("This should fail with an IOException");
}
catch (IOException e) {
// expected. validate the error message
assertNotNull(e.getMessage());
assertTrue(e.getMessage().contains("Address already in use"));
}

}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
if (blocker != null) {
try {
blocker.close();
}
catch (IOException e) {
// no need to log here
}
}
}
}

/**
* Tests that the TaskManager startup fails synchronously when the I/O directories are
* not writable.
*/
@Test
public void testIODirectoryNotWritable() {
File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
File nonWritable = new File(tempDir, UUID.randomUUID().toString());

if (!nonWritable.mkdirs() || !nonWritable.setWritable(false, false)) {
System.err.println("Cannot create non-writable temporary file directory. Skipping test.");
return;
}

try {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, nonWritable.getAbsolutePath());
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);

try {
TaskManager.runTaskManager("localhost", 0, cfg);
fail("Should fail synchronously with an exception");
}
catch (IOException e) {
// splendid!
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
//noinspection ResultOfMethodCallIgnored
nonWritable.setWritable(true, false);
try {
FileUtils.deleteDirectory(nonWritable);
}
catch (IOException e) {
// best effort
}
}
}

/**
* Tests that the TaskManager startup fails synchronously when the I/O directories are
* not writable.
*/
@Test
public void testMemoryConfigWrong() {
try {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);

// something invalid
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42);
try {
TaskManager.runTaskManager("localhost", 0, cfg);
fail("Should fail synchronously with an exception");
}
catch (IllegalConfigurationException e) {
// splendid!
}

// something ridiculously high
final long memSize = (((long) Integer.MAX_VALUE - 1) *
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) >> 20;
cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize);
try {
TaskManager.runTaskManager("localhost", 0, cfg);
fail("Should fail synchronously with an exception");
}
catch (Exception e) {
// splendid!
assertTrue(e.getCause() instanceof OutOfMemoryError);
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}

0 comments on commit 4ed009e

Please sign in to comment.