Skip to content

Commit

Permalink
[flink-yarn-tests] Add check for exceptions in the flink logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
rmetzger committed Mar 4, 2015
1 parent 94a66d5 commit 7abed95
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public void testClientStartup() {
"-jm", "512",
"-tm", "1024", "-qu", "qa-team"},
"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);

ensureNoExceptionsInLogFiles();
}


Expand All @@ -71,5 +73,7 @@ public void testNonexistingQueue() {
"-tm", "1024",
"-qu", "doesntExist"}, "to unknown queue: doesntExist", RunTypes.YARN_SESSION);
checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team");

ensureNoExceptionsInLogFiles();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ public void testClientStartup() {
"-tm", "1024"},
"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
LOG.info("Finished testClientStartup()");
ensureNoExceptionsInLogFiles();
}


/**
* Test querying the YARN cluster.
*
Expand All @@ -80,6 +82,7 @@ public void testQueryCluster() {
LOG.info("Starting testQueryCluster()");
runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores.
LOG.info("Finished testQueryCluster()");
ensureNoExceptionsInLogFiles();
}

/**
Expand All @@ -95,6 +98,7 @@ public void testNonexistingQueue() {
"-tm", "1024",
"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
LOG.info("Finished testNonexistingQueue()");
ensureNoExceptionsInLogFiles();
}

/**
Expand All @@ -113,6 +117,7 @@ public void testMoreNodesThanAvailable() {
"-tm", "1024"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); // the number of TMs depends on the speed of the test hardware
LOG.info("Finished testMoreNodesThanAvailable()");
checkForLogString("This YARN session requires 10752MB of memory in the cluster. There are currently only 8192MB available.");
ensureNoExceptionsInLogFiles();
}

/**
Expand Down Expand Up @@ -171,6 +176,7 @@ public void testfullAlloc() {
LOG.info("Finished testfullAlloc()");
checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" +
"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
ensureNoExceptionsInLogFiles();
}

/**
Expand All @@ -188,6 +194,7 @@ public void perJobYarnCluster() {
"-yjm", "512",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status FINISHED.", RunTypes.CLI_FRONTEND);
LOG.info("Finished perJobYarnCluster()");
ensureNoExceptionsInLogFiles();
}

/**
Expand Down Expand Up @@ -244,6 +251,8 @@ public void testJavaAPI() {
// shutdown cluster
yarnCluster.shutdown();
LOG.info("Finished testJavaAPI()");

ensureNoExceptionsInLogFiles();
}

public boolean ignoreOnTravis() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
Expand All @@ -49,6 +50,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;


/**
Expand All @@ -64,6 +66,8 @@ public abstract class YarnTestBase {
private final static PrintStream originalStdout = System.out;
private final static PrintStream originalStderr = System.err;

private final static String TEST_CLUSTER_NAME = "flink-yarn-tests";


// Temp directory which is deleted after the unit test.
private static TemporaryFolder tmp = new TemporaryFolder();
Expand Down Expand Up @@ -216,6 +220,54 @@ public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOExcep
return yarnSiteXML;
}

/**
* This method checks the written TaskManager and JobManager log files
* for exceptions.
*/
public static void ensureNoExceptionsInLogFiles() {
File cwd = new File("target/"+TEST_CLUSTER_NAME);
Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists());
Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
System.out.println("cwd = "+cwd.getAbsolutePath());
File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
File f = new File(dir.getAbsolutePath()+ "/" + name);
// scan each file for 'Exception'.
Scanner scanner = null;
try {
scanner = new Scanner(f);
} catch (FileNotFoundException e) {
Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath());
}
while (scanner.hasNextLine()) {
final String lineFromFile = scanner.nextLine();
if(lineFromFile.contains("Exception")) {
return true;
}
}
return false;
}
});
if(foundFile != null) {
Scanner scanner = null;
try {
scanner = new Scanner(foundFile);
} catch (FileNotFoundException e) {
Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath());
}
LOG.warn("Found a file with an exception. Printing contents:");
while (scanner.hasNextLine()) {
LOG.warn("LINE: "+scanner.nextLine());
}
Assert.fail("Found a file "+foundFile+" with an exception");
}
}

public static void main(String[] args) {
ensureNoExceptionsInLogFiles();
}

public static void startYARNWithConfig(Configuration conf) {
flinkUberjar = findFile(".", new RootDirFilenameFilter());
Assert.assertNotNull(flinkUberjar);
Expand All @@ -228,7 +280,7 @@ public static void startYARNWithConfig(Configuration conf) {
try {
LOG.info("Starting up MiniYARN cluster");
if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster(YarnTestBase.class.getName(), 2, 1, 1);
yarnCluster = new MiniYARNCluster(TEST_CLUSTER_NAME, 2, 1, 1);

yarnCluster.init(conf);
yarnCluster.start();
Expand Down

0 comments on commit 7abed95

Please sign in to comment.