Skip to content

Commit

Permalink
[FLINK-11135][configuration] Reorder Hadoop config loading in HadoopU…
Browse files Browse the repository at this point in the history
…tils

This closes apache#7314 .
  • Loading branch information
link3280 authored and tisonkun committed Jan 8, 2020
1 parent 6ff3928 commit dffa19b
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,63 +50,62 @@ public static Configuration getHadoopConfiguration(org.apache.flink.configuratio

// Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
// from the classpath

Configuration result = new HdfsConfiguration();
boolean foundHadoopConfiguration = false;

// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
// the hdfs configuration
// Try to load HDFS configuration from Hadoop's own configuration files
// 1. approach: Flink configuration
// the hdfs configuration.
// The properties of a newly added resource will override the ones in previous resources, so a configuration
// file with higher priority should be added later.

// Approach 1: HADOOP_HOME environment variables
String[] possibleHadoopConfPaths = new String[2];

final String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
possibleHadoopConfPaths[0] = hadoopHome + "/conf";
possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
}

for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
if (possibleHadoopConfPath != null) {
foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
}
}

// Approach 2: Flink configuration (deprecated)
final String hdfsDefaultPath =
flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);

if (hdfsDefaultPath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath);
LOG.debug("Using hdfs-default configuration-file path from Flink config: {}", hdfsDefaultPath);
foundHadoopConfiguration = true;
} else {
LOG.debug("Cannot find hdfs-default configuration-file path in Flink config.");
}

final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath);
LOG.debug("Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
foundHadoopConfiguration = true;
} else {
LOG.debug("Cannot find hdfs-site configuration-file path in Flink config.");
}

// 2. Approach environment variables
String[] possibleHadoopConfPaths = new String[4];
possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");

final String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null) {
possibleHadoopConfPaths[2] = hadoopHome + "/conf";
possibleHadoopConfPaths[3] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
final String hadoopConfigPath = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
if (hadoopConfigPath != null) {
LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
foundHadoopConfiguration = addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
}

for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
if (possibleHadoopConfPath != null) {
if (new File(possibleHadoopConfPath).exists()) {
if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
foundHadoopConfiguration = true;
}
if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
foundHadoopConfiguration = true;
}
}
}
// Approach 3: HADOOP_CONF_DIR environment variable
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if (hadoopConfDir != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
foundHadoopConfiguration = addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
}

if (!foundHadoopConfiguration) {
LOG.debug("Could not find Hadoop configuration via any of the supported methods " +
LOG.warn("Could not find Hadoop configuration via any of the supported methods " +
"(Flink configuration, environment variables).");
}

Expand Down Expand Up @@ -162,4 +161,24 @@ private static Tuple2<Integer, Integer> getMajorMinorBundledHadoopVersion() {
int min = Integer.parseInt(versionParts[1]);
return Tuple2.of(maj, min);
}

/**
* Search Hadoop configuration files in the given path, and add them to the configuration if found.
*/
private static boolean addHadoopConfIfFound(Configuration configuration, String possibleHadoopConfPath) {
boolean foundHadoopConfiguration = false;
if (new File(possibleHadoopConfPath).exists()) {
if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
configuration.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
foundHadoopConfiguration = true;
}
if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
configuration.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
foundHadoopConfiguration = true;
}
}
return foundHadoopConfiguration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,111 @@ public void loadFromEnvVariables() throws Exception {
assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
}

@Test
public void loadOverlappingConfig() throws Exception {
final String k1 = "key1";
final String k2 = "key2";
final String k3 = "key3";
final String k4 = "key4";
final String k5 = "key5";

final String v1 = "from HADOOP_CONF_DIR";
final String v2 = "from Flink config `fs.hdfs.hadoopconf`";
final String v3 = "from Flink config `fs.hdfs.hdfsdefault`";
final String v4 = "from HADOOP_HOME/etc/hadoop";
final String v5 = "from HADOOP_HOME/conf";

final File hadoopConfDir = tempFolder.newFolder("hadoopConfDir");
final File hadoopConfEntryDir = tempFolder.newFolder("hadoopConfEntryDir");
final File legacyConfDir = tempFolder.newFolder("legacyConfDir");
final File hadoopHome = tempFolder.newFolder("hadoopHome");

final File hadoopHomeConf = new File(hadoopHome, "conf");
final File hadoopHomeEtc = new File(hadoopHome, "etc/hadoop");

assertTrue(hadoopHomeConf.mkdirs());
assertTrue(hadoopHomeEtc.mkdirs());

final File file1 = new File(hadoopConfDir, "core-site.xml");
final File file2 = new File(hadoopConfEntryDir, "core-site.xml");
final File file3 = new File(legacyConfDir, "core-site.xml");
final File file4 = new File(hadoopHomeEtc, "core-site.xml");
final File file5 = new File(hadoopHomeConf, "core-site.xml");

printConfig(file1, k1, v1);

Map<String, String> properties2 = new HashMap<>();
properties2.put(k1, v2);
properties2.put(k2, v2);
printConfigs(file2, properties2);

Map<String, String> properties3 = new HashMap<>();
properties3.put(k1, v3);
properties3.put(k2, v3);
properties3.put(k3, v3);
printConfigs(file3, properties3);

Map<String, String> properties4 = new HashMap<>();
properties4.put(k1, v4);
properties4.put(k2, v4);
properties4.put(k3, v4);
properties4.put(k4, v4);
printConfigs(file4, properties4);

Map<String, String> properties5 = new HashMap<>();
properties5.put(k1, v5);
properties5.put(k2, v5);
properties5.put(k3, v5);
properties5.put(k4, v5);
properties5.put(k5, v5);
printConfigs(file5, properties5);

final Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.PATH_HADOOP_CONFIG, hadoopConfEntryDir.getAbsolutePath());
cfg.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, file3.getAbsolutePath());

final org.apache.hadoop.conf.Configuration hadoopConf;

final Map<String, String> originalEnv = System.getenv();
final Map<String, String> newEnv = new HashMap<>(originalEnv);
newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath());
newEnv.put("HADOOP_HOME", hadoopHome.getAbsolutePath());
try {
CommonTestUtils.setEnv(newEnv);
hadoopConf = HadoopUtils.getHadoopConfiguration(cfg);
}
finally {
CommonTestUtils.setEnv(originalEnv);
}

// contains extra entries
assertEquals(v1, hadoopConf.get(k1, null));
assertEquals(v2, hadoopConf.get(k2, null));
assertEquals(v3, hadoopConf.get(k3, null));
assertEquals(v4, hadoopConf.get(k4, null));
assertEquals(v5, hadoopConf.get(k5, null));

// also contains classpath defaults
assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
}

private static void printConfig(File file, String key, String value) throws IOException {
Map<String, String> map = new HashMap<>(1);
map.put(key, value);
printConfigs(file, map);
}

private static void printConfigs(File file, Map<String, String> properties) throws IOException {
try (PrintStream out = new PrintStream(new FileOutputStream(file))) {
out.println("<?xml version=\"1.0\"?>");
out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>");
out.println("<configuration>");
out.println("\t<property>");
out.println("\t\t<name>" + key + "</name>");
out.println("\t\t<value>" + value + "</value>");
out.println("\t</property>");
for (Map.Entry<String, String> entry: properties.entrySet()) {
out.println("\t<property>");
out.println("\t\t<name>" + entry.getKey() + "</name>");
out.println("\t\t<value>" + entry.getValue() + "</value>");
out.println("\t</property>");
}
out.println("</configuration>");
}
}
Expand Down

0 comments on commit dffa19b

Please sign in to comment.