Skip to content

Commit

Permalink
[FLINK-1669] [streaming] Clean up and fix test for streaming fault to…
Browse files Browse the repository at this point in the history
…lerance with proess failures
  • Loading branch information
StephanEwen committed Mar 29, 2015
1 parent 56afefc commit c284745
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,6 @@ public StreamExecutionEnvironment setParallelism(int parallelism) {
return this;
}

/**
* Sets the number of times that failed tasks are re-executed. A value of zero
* effectively disables fault tolerance. A value of {@code -1} indicates that the system
* default value (as defined in the configuration) should be used.
*
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*/
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
config.setNumberOfExecutionRetries(numberOfExecutionRetries);
}

/**
* Sets the maximum time frequency (milliseconds) for the flushing of the
* output buffers. By default the output buffers flush frequently to provide
Expand Down Expand Up @@ -397,10 +386,10 @@ public DataStreamSource<String> readTextFile(String filePath, String charsetName
* The interval of file watching in milliseconds.
* @param watchType
* The watch type of file stream. When watchType is
* {@link WatchType.ONLY_NEW_FILES}, the system processes only
* new files. {@link WatchType.REPROCESS_WITH_APPENDED} means
* {@link WatchType#ONLY_NEW_FILES}, the system processes only
* new files. {@link WatchType#REPROCESS_WITH_APPENDED} means
* that the system re-processes all contents of appended file.
* {@link WatchType.PROCESS_ONLY_APPENDED} means that the system
* {@link WatchType#PROCESS_ONLY_APPENDED} means that the system
* processes only appended contents of files.
*
* @return The DataStream containing the given directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* Interface for implementing user defined sink functionality.
*
* @param <IN> INput type parameter.
* @param <IN> Input type parameter.
*/
public interface SinkFunction<IN> extends Function, Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.test.util;
package org.apache.flink.test.recovery;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand All @@ -40,14 +40,11 @@
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -58,7 +55,7 @@
import static org.junit.Assert.fail;

/**
* This is a testbase for tests verifying the behavior of the recovery in the
* Abstract base for tests verifying the behavior of the recovery in the
* case when a TaskManager fails (process is killed) in the middle of a job execution.
*
* The test works with multiple task managers processes by spawning JVMs.
Expand All @@ -70,7 +67,7 @@
* guaranteed to remain empty (all tasks are already deployed) and kills one of
* the original task managers. The recovery should restart the tasks on the new TaskManager.
*/
public abstract class ProcessFailureRecoveryTestBase {
public abstract class AbstractProcessFailureRecoveryTest {

protected static final String READY_MARKER_FILE_PREFIX = "ready_";
protected static final String PROCEED_MARKER_FILE = "proceed";
Expand All @@ -96,7 +93,7 @@ public void testTaskManagerProcessFailure() {
// is available on this machine
String javaCommand = getJavaCommandPath();
if (javaCommand == null) {
System.out.println("---- Skipping ProcessFailureBatchRecoveryITCase : Could not find java executable");
System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
return;
}

Expand All @@ -118,12 +115,13 @@ public void testTaskManagerProcessFailure() {
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "2 s");

jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();

// the TaskManager java command
String[] command = new String[]{
String[] command = new String[] {
javaCommand,
"-Dlog.level=DEBUG",
"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
Expand All @@ -150,8 +148,19 @@ public void testTaskManagerProcessFailure() {
final File coordinateDirClosure = coordinateTempDir;
final Throwable[] errorRef = new Throwable[1];

// get a trigger for the test program implemented by a subclass
Thread programTrigger = testProgram(jobManagerPort, coordinateDirClosure, errorRef);
// we trigger program execution in a separate thread
Thread programTrigger = new Thread("Program Trigger") {
@Override
public void run() {
try {
testProgram(jobManagerPort, coordinateDirClosure);
}
catch (Throwable t) {
t.printStackTrace();
errorRef[0] = t;
}
}
};

//start the test program
programTrigger.start();
Expand Down Expand Up @@ -180,9 +189,6 @@ public void testTaskManagerProcessFailure() {
// check that the program really finished
assertFalse("The program did not finish in time", programTrigger.isAlive());

// apply post submission checks specified by the subclass
postSubmit();

// check whether the program encountered an error
if (errorRef[0] != null) {
Throwable error = errorRef[0];
Expand All @@ -191,20 +197,22 @@ public void testTaskManagerProcessFailure() {
}

// all seems well :-)

} catch (Exception e) {
}
catch (Exception e) {
e.printStackTrace();
printProcessLog("TaskManager 1", processOutput1.toString());
printProcessLog("TaskManager 2", processOutput2.toString());
printProcessLog("TaskManager 3", processOutput3.toString());
fail(e.getMessage());
} catch (Error e) {
}
catch (Error e) {
e.printStackTrace();
printProcessLog("TaskManager 1", processOutput1.toString());
printProcessLog("TaskManager 2", processOutput2.toString());
printProcessLog("TaskManager 3", processOutput3.toString());
throw e;
} finally {
}
finally {
if (taskManagerProcess1 != null) {
taskManagerProcess1.destroy();
}
Expand Down Expand Up @@ -233,21 +241,10 @@ public void testTaskManagerProcessFailure() {
* This provides a solution for checking that it has been terminated.
*
* @param jobManagerPort The port for submitting the topology to the local cluster
* @param coordinateDirClosure taskmanager failure will be triggered only after proccesses
* @param coordinateDir TaskManager failure will be triggered only after processes
* have successfully created file under this directory
* @param errorRef Errors passed back to the superclass
* @return thread containing the test program
*/
abstract public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef);

/**
* Check to be carried out after the completion of the test program thread.
* In case of failed checks {@link java.lang.AssertionError} should be thrown.
*
* @throws Error
* @throws Exception
*/
abstract public void postSubmit() throws Error, Exception;
public abstract void testProgram(int jobManagerPort, File coordinateDir) throws Exception;


protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
Expand Down Expand Up @@ -280,40 +277,6 @@ protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int nu
}
}

public static void fileBatchHasEveryNumberLower(int n, String path) throws IOException, AssertionError {

HashSet<Integer> set = new HashSet<Integer>(n);

int counter = 0;
File file = new File(path + "-" + counter);

while (file.exists()) {

BufferedReader bufferedReader = new BufferedReader(new FileReader(file));

String line = bufferedReader.readLine();

while (line != null) {
int num = Integer.parseInt(line);

set.add(num);

line = bufferedReader.readLine();
}

bufferedReader.close();
file.delete();
counter++;
file = new File(path + "-" + counter);
}

for (int i = 0; i < n; i++) {
if (!set.contains(i)) {
throw new AssertionError("Missing number: " + i);
}
}
}

protected static void printProcessLog(String processName, String log) {
if (log == null || log.length() == 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,32 @@

package org.apache.flink.test.recovery;

import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.test.util.ProcessFailureRecoveryTestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;

import static org.junit.Assert.assertEquals;

/**
* Test for streaming program behaviour in case of taskmanager failure
* based on {@link ProcessFailureRecoveryTestBase}.
* Test the recovery of a simple batch program in the case of TaskManager process failure.
*/
@SuppressWarnings("serial")
public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRecoveryITCase {
@RunWith(Parameterized.class)
public class ProcessFailureBatchRecoveryITCase extends AbstractProcessFailureRecoveryTest {

private ExecutionMode executionMode;
// --------------------------------------------------------------------------------------------
// Parametrization (run pipelined and batch)
// --------------------------------------------------------------------------------------------

private final ExecutionMode executionMode;

public ProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) {
this.executionMode = executionMode;
Expand All @@ -48,12 +56,17 @@ public static Collection<Object[]> executionMode() {
{ExecutionMode.BATCH}});
}

// --------------------------------------------------------------------------------------------
// Test the program
// --------------------------------------------------------------------------------------------

@Override
public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef) {
public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception {

ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
env.setDegreeOfParallelism(PARALLELISM);
env.setParallelism(PARALLELISM);
env.setNumberOfExecutionRetries(1);
env.getConfig().setExecutionMode(executionMode);

final long NUM_ELEMENTS = 100000L;
final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS)
Expand All @@ -63,7 +76,7 @@ public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, f
// the majority of the behavior is in the MapFunction
.map(new RichMapFunction<Long, Long>() {

private final File proceedFile = new File(coordinateDirClosure, PROCEED_MARKER_FILE);
private final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);

private boolean markerCreated = false;
private boolean checkForProceedFile = true;
Expand All @@ -72,7 +85,7 @@ public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, f
public Long map(Long value) throws Exception {
if (!markerCreated) {
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
touchFile(new File(coordinateDirClosure, READY_MARKER_FILE_PREFIX + taskIndex));
touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
markerCreated = true;
}

Expand All @@ -95,24 +108,7 @@ public Long reduce(Long value1, Long value2) {
}
});

// we trigger program execution in a separate thread
return new Thread("ProcessFailureBatchRecoveryITCase Program Trigger") {
@Override
public void run() {
try {
long sum = result.collect().get(0);
assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum);
} catch (Throwable t) {
t.printStackTrace();
errorRef[0] = t;
}
}
};
}

@Override
public void postSubmit() throws Exception, Error {
// unnecessary
long sum = result.collect().get(0);
assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum);
}

}
Loading

0 comments on commit c284745

Please sign in to comment.