Skip to content

Commit

Permalink
[FLINK-12075][yarn] Set RestOptions.BIND_PORT only to 0 if not specified
Browse files Browse the repository at this point in the history
This commit changes the YarnEntrypointUtils#loadConfiguration so that it only
sets RestOptions.BIND_PORT to 0 if it has not been specified. This allows to
explicitly set a port range for Yarn applications which are running behind a
firewall, for example.

This closes apache#8096.
  • Loading branch information
tillrohrmann committed Apr 2, 2019
1 parent 065306d commit 5210948
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ public static Configuration loadConfiguration(String workingDirectory, Map<Strin
configuration.setInteger(WebOptions.PORT, 0);
}

// set the REST port to 0 to select it randomly
configuration.setString(RestOptions.BIND_PORT, "0");
if (!configuration.contains(RestOptions.BIND_PORT)) {
// set the REST port to 0 to select it randomly
configuration.setString(RestOptions.BIND_PORT, "0");
}

// if the user has set the deprecated YARN-specific config keys, we add the
// corresponding generic config keys instead. that way, later code needs not
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.util.TestLogger;

import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link YarnEntrypointUtils}.
*/
public class YarnEntrypointUtilsTest extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(YarnEntrypointUtilsTest.class);

@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

/**
* Tests that the REST ports are correctly set when loading a {@link Configuration}
* with unspecified REST options.
*/
@Test
public void testRestPortOptionsUnspecified() throws IOException {
final Configuration initialConfiguration = new Configuration();

final Configuration configuration = loadConfiguration(initialConfiguration);

// having not specified the ports should set the rest bind port to 0
assertThat(configuration.getString(RestOptions.BIND_PORT), is(equalTo("0")));
}

/**
* Tests that the binding REST port is set to the REST port if set.
*/
@Test
public void testRestPortSpecified() throws IOException {
final Configuration initialConfiguration = new Configuration();
final int port = 1337;
initialConfiguration.setInteger(RestOptions.PORT, port);

final Configuration configuration = loadConfiguration(initialConfiguration);

// if the bind port is not specified it should fall back to the rest port
assertThat(configuration.getString(RestOptions.BIND_PORT), is(equalTo(String.valueOf(port))));
}

/**
* Tests that the binding REST port has precedence over the REST port if both are set.
*/
@Test
public void testRestPortAndBindingPortSpecified() throws IOException {
final Configuration initialConfiguration = new Configuration();
final int port = 1337;
final String bindingPortRange = "1337-7331";
initialConfiguration.setInteger(RestOptions.PORT, port);
initialConfiguration.setString(RestOptions.BIND_PORT, bindingPortRange);

final Configuration configuration = loadConfiguration(initialConfiguration);

// bind port should have precedence over the rest port
assertThat(configuration.getString(RestOptions.BIND_PORT), is(equalTo(bindingPortRange)));
}

@Nonnull
private static Configuration loadConfiguration(Configuration initialConfiguration) throws IOException {
final File workingDirectory = TEMPORARY_FOLDER.newFolder();
final Map<String, String> env = new HashMap<>(4);
env.put(ApplicationConstants.Environment.NM_HOST.key(), "foobar");

BootstrapTools.writeConfiguration(initialConfiguration, new File(workingDirectory, "flink-conf.yaml"));
return YarnEntrypointUtils.loadConfiguration(workingDirectory.getAbsolutePath(), env, LOG);
}

}

0 comments on commit 5210948

Please sign in to comment.