Skip to content

Commit

Permalink
[FLINK-20214][k8s] Fix the unnecessary warning logs when Hadoop envir…
Browse files Browse the repository at this point in the history
…onment is not set

This closes apache#14132.
  • Loading branch information
wangyang0918 authored and xintongsong committed Nov 24, 2020
1 parent 2817502 commit 00ee23b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,23 @@ public Optional<String> getExistingHadoopConfigurationConfigMap() {

@Override
public Optional<String> getLocalHadoopConfigurationDirectory() {
final String[] possibleHadoopConfPaths = new String[] {
System.getenv(Constants.ENV_HADOOP_CONF_DIR),
System.getenv(Constants.ENV_HADOOP_HOME) + "/etc/hadoop", // hadoop 2.2
System.getenv(Constants.ENV_HADOOP_HOME) + "/conf"
};

for (String hadoopConfPath: possibleHadoopConfPaths) {
if (StringUtils.isNotBlank(hadoopConfPath)) {
return Optional.of(hadoopConfPath);
final String hadoopConfDirEnv = System.getenv(Constants.ENV_HADOOP_CONF_DIR);
if (StringUtils.isNotBlank(hadoopConfDirEnv)) {
return Optional.of(hadoopConfDirEnv);
}

final String hadoopHomeEnv = System.getenv(Constants.ENV_HADOOP_HOME);
if (StringUtils.isNotBlank(hadoopHomeEnv)) {
// Hadoop 2.2+
final File hadoop2ConfDir = new File(hadoopHomeEnv, "/etc/hadoop");
if (hadoop2ConfDir.exists()) {
return Optional.of(hadoop2ConfDir.getAbsolutePath());
}

// Hadoop 1.x
final File hadoop1ConfDir = new File(hadoopHomeEnv, "/conf");
if (hadoop1ConfDir.exists()) {
return Optional.of(hadoop1ConfDir.getAbsolutePath());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;

import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
Expand All @@ -43,6 +50,9 @@ public class AbstractKubernetesParametersTest extends TestLogger {
private final Configuration flinkConfig = new Configuration();
private final TestingKubernetesParameters testingKubernetesParameters = new TestingKubernetesParameters(flinkConfig);

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Test
public void testClusterIdMustNotBeBlank() {
flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID, " ");
Expand Down Expand Up @@ -78,6 +88,67 @@ public void getConfigDirectoryFallbackToPodConfDir() {
assertThat(testingKubernetesParameters.getConfigDirectory(), is(confDirInPod));
}

@Test
public void testGetLocalHadoopConfigurationDirectoryReturnEmptyWhenHadoopEnvIsNotSet() throws Exception {
runTestWithEmptyEnv(() -> {
final Optional<String> optional = testingKubernetesParameters.getLocalHadoopConfigurationDirectory();
assertThat(optional.isPresent(), is(false));
});
}

@Test
public void testGetLocalHadoopConfigurationDirectoryFromHadoopConfDirEnv() throws Exception {
runTestWithEmptyEnv(() -> {
final String hadoopConfDir = "/etc/hadoop/conf";
setEnv(Constants.ENV_HADOOP_CONF_DIR, hadoopConfDir);

final Optional<String> optional = testingKubernetesParameters.getLocalHadoopConfigurationDirectory();
assertThat(optional.isPresent(), is(true));
assertThat(optional.get(), is(hadoopConfDir));
});
}

@Test
public void testGetLocalHadoopConfigurationDirectoryFromHadoop2HomeEnv() throws Exception {
runTestWithEmptyEnv(() -> {
final String hadoopHome = temporaryFolder.getRoot().getAbsolutePath();
temporaryFolder.newFolder("etc", "hadoop");
setEnv(Constants.ENV_HADOOP_HOME, hadoopHome);

final Optional<String> optional = testingKubernetesParameters.getLocalHadoopConfigurationDirectory();
assertThat(optional.isPresent(), is(true));
assertThat(optional.get(), is(hadoopHome + "/etc/hadoop"));
});
}

@Test
public void testGetLocalHadoopConfigurationDirectoryFromHadoop1HomeEnv() throws Exception {
runTestWithEmptyEnv(() -> {
final String hadoopHome = temporaryFolder.getRoot().getAbsolutePath();
temporaryFolder.newFolder("conf");
setEnv(Constants.ENV_HADOOP_HOME, hadoopHome);

final Optional<String> optional = testingKubernetesParameters.getLocalHadoopConfigurationDirectory();
assertThat(optional.isPresent(), is(true));
assertThat(optional.get(), is(hadoopHome + "/conf"));
});
}

private void runTestWithEmptyEnv(RunnableWithException testMethod) throws Exception {
final Map<String, String> current = new HashMap<>(System.getenv());
// Clear the environments
CommonTestUtils.setEnv(Collections.emptyMap(), true);
testMethod.run();
// Restore the environments
CommonTestUtils.setEnv(current, true);
}

private void setEnv(String key, String value) {
final Map<String, String> map = new HashMap<>();
map.put(key, value);
CommonTestUtils.setEnv(map, false);
}

/**
* KubernetesParameters for testing usecase.
*/
Expand Down

0 comments on commit 00ee23b

Please sign in to comment.