Skip to content

Commit

Permalink
[FLINK-19295][yarn][tests] Exclude more meaningless akka shutdown err…
Browse files Browse the repository at this point in the history
…ors (apache#13439)
  • Loading branch information
rmetzger authored Oct 5, 2020
1 parent 713d02e commit 6c7b195
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -123,25 +124,26 @@ public abstract class YarnTestBase extends TestLogger {
};

/** These strings are white-listed, overriding the prohibited strings. */
protected static final String[] WHITELISTED_STRINGS = {
"akka.remote.RemoteTransportExceptionNoStackTrace",
protected static final Pattern[] WHITELISTED_STRINGS = {
Pattern.compile("akka\\.remote\\.RemoteTransportExceptionNoStackTrace"),
// workaround for annoying InterruptedException logging:
// https://issues.apache.org/jira/browse/YARN-1022
"java.lang.InterruptedException",
// very specific on purpose
"Remote connection to [null] failed with java.net.ConnectException: Connection refused",
"Remote connection to [null] failed with java.nio.channels.NotYetConnectedException",
"java.io.IOException: Connection reset by peer",
Pattern.compile("java\\.lang\\.InterruptedException"),
// very specific on purpose; whitelist meaningless exceptions that occur during akka shutdown:
Pattern.compile("Remote connection to \\[null\\] failed with java.net.ConnectException: Connection refused"),
Pattern.compile("Remote connection to \\[null\\] failed with java.nio.channels.NotYetConnectedException"),
Pattern.compile("java\\.io\\.IOException: Connection reset by peer"),
Pattern.compile("Association with remote system \\[akka.tcp:https://flink@[^]]+\\] has failed, address is now gated for \\[50\\] ms. Reason: \\[Association failed with \\[akka.tcp:https://flink@[^]]+\\]\\] Caused by: \\[java.net.ConnectException: Connection refused: [^]]+\\]"),

// filter out expected ResourceManagerException caused by intended shutdown request
YarnResourceManager.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST,
Pattern.compile(YarnResourceManager.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST),

// this can happen in Akka 2.4 on shutdown.
"java.util.concurrent.RejectedExecutionException: Worker has already been shutdown",
Pattern.compile("java\\.util\\.concurrent\\.RejectedExecutionException: Worker has already been shutdown"),

"org.apache.flink.util.FlinkException: Stopping JobMaster",
"org.apache.flink.util.FlinkException: JobManager is shutting down.",
"lost the leadership."
Pattern.compile("org\\.apache\\.flink.util\\.FlinkException: Stopping JobMaster"),
Pattern.compile("org\\.apache\\.flink.util\\.FlinkException: JobManager is shutting down\\."),
Pattern.compile("lost the leadership.")
};

// Temp directory which is deleted after the unit test.
Expand Down Expand Up @@ -390,7 +392,7 @@ private static void writeHDFSSiteConfigXML(Configuration coreSite, File targetFo
* So always run "mvn clean" before running the tests here.
*
*/
public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) {
public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final Pattern[] whitelisted) {
File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists());
Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory());
Expand All @@ -400,17 +402,18 @@ public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited,
@Override
public boolean accept(File dir, String name) {
// scan each file for prohibited strings.
File f = new File(dir.getAbsolutePath() + "/" + name);
File logFile = new File(dir.getAbsolutePath() + "/" + name);
try {
BufferingScanner scanner = new BufferingScanner(new Scanner(f), 10);
BufferingScanner scanner = new BufferingScanner(new Scanner(logFile), 10);
while (scanner.hasNextLine()) {
final String lineFromFile = scanner.nextLine();
for (String aProhibited : prohibited) {
if (lineFromFile.contains(aProhibited)) {

boolean whitelistedFound = false;
for (String white : whitelisted) {
if (lineFromFile.contains(white)) {
for (Pattern whitelistPattern : whitelisted) {
Matcher whitelistMatch = whitelistPattern.matcher(lineFromFile);
if (whitelistMatch.find()) {
whitelistedFound = true;
break;
}
Expand All @@ -419,7 +422,7 @@ public boolean accept(File dir, String name) {
if (!whitelistedFound) {
// logging in FATAL to see the actual message in CI tests.
Marker fatal = MarkerFactory.getMarker("FATAL");
LOG.error(fatal, "Prohibited String '{}' in '{}:{}'", aProhibited, f.getAbsolutePath(), lineFromFile);
LOG.error(fatal, "Prohibited String '{}' in '{}:{}'", aProhibited, logFile.getAbsolutePath(), lineFromFile);

StringBuilder logExcerpt = new StringBuilder();

Expand Down Expand Up @@ -456,7 +459,7 @@ public boolean accept(File dir, String name) {

}
} catch (FileNotFoundException e) {
LOG.warn("Unable to locate file: " + e.getMessage() + " file: " + f.getAbsolutePath());
LOG.warn("Unable to locate file: " + e.getMessage() + " file: " + logFile.getAbsolutePath());
}

return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.junit.Assert;
import org.junit.Test;

import java.util.regex.Pattern;

/**
* Tests for {@link YarnTestBase}.
*/
public class YarnTestBaseTest {

@Test
public void ensureWhitelistEntryMatches() {
ensureWhitelistEntryMatch("465 java.lang.InterruptedException: sleep interrupted");
ensureWhitelistEntryMatch("2020-09-19 22:06:19,458 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp:https://flink@e466f3e261f3:42352] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp:https://flink@e466f3e261f3:42352]] Caused by: [java.net.ConnectException: Connection refused: e466f3e261f3/192.168.224.2:42352]");
}

private void ensureWhitelistEntryMatch(String probe) {
for (Pattern pattern : YarnTestBase.WHITELISTED_STRINGS) {
if (pattern.matcher(probe).find()) {
return;
}
}
Assert.fail("The following string didn't match any whitelisted patterns '" + probe + "'");
}
}

0 comments on commit 6c7b195

Please sign in to comment.