Skip to content

Commit

Permalink
[FLINK-12468][yarn] Derive HistoryServer's URL from HistoryServerOptions
Browse files Browse the repository at this point in the history
This commit derives the HistoryServer's URL from the availabe HistoryServerOptions.

This closes apache#8396.
  • Loading branch information
tillrohrmann committed May 14, 2019
1 parent b3f102d commit 792f2a2
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 17 deletions.
5 changes: 0 additions & 5 deletions docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@
<td style="word-wrap: break-word;">5</td>
<td>Time between heartbeats with the ResourceManager in seconds.</td>
</tr>
<tr>
<td><h5>yarn.history.server.address</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The address of Flink HistoryServer.</td>
</tr>
<tr>
<td><h5>yarn.maximum-failed-containers</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) thro
Preconditions.checkNotNull(numFinishedPolls);

this.config = config;
if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) {
if (HistoryServerUtils.isSSLEnabled(config)) {
LOG.info("Enabling SSL for the history server.");
try {
this.serverSSLFactory = SSLUtils.createRestServerSSLEngineFactory(config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.history;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.runtime.net.SSLUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Optional;

/**
* Utility class for the HistoryServer.
*/
public enum HistoryServerUtils {
;

private static final Logger LOG = LoggerFactory.getLogger(HistoryServerUtils.class);

public static boolean isSSLEnabled(Configuration config) {
return config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config);
}

public static Optional<URL> getHistoryServerURL(Configuration configuration) {
final String hostname = getHostname(configuration);

if (hostname != null) {
final String protocol = getProtocol(configuration);
final int port = getPort(configuration);

try {
return Optional.of(new URL(protocol, hostname, port, ""));
} catch (MalformedURLException e) {
LOG.debug("Could not create the HistoryServer's URL from protocol: {}, hostname: {} and port: {}.", protocol, hostname, port, e);
return Optional.empty();
}
} else {
LOG.debug("Not hostname has been specified for the HistoryServer. This indicates that it has not been started.");
return Optional.empty();
}
}

private static int getPort(Configuration configuration) {
return configuration.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT);
}

@Nullable
private static String getHostname(Configuration configuration) {
return configuration.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
}

private static String getProtocol(Configuration configuration) {
if (isSSLEnabled(configuration)) {
return "https";
} else {
return "http";
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.history;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import javax.annotation.Nonnull;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Optional;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link HistoryServerUtils}.
*/
public class HistoryServerUtilsTest extends TestLogger {

private static final String HOSTNAME = "foobar";
private static final int PORT = 1234;

@Test
public void testIsSSLEnabledDefault() {
final Configuration configuration = new Configuration();

assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(false));
}

@Test
public void testIsSSLEnabledWithoutRestSSL() {
final Configuration configuration = new Configuration();
configuration.setBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED, true);

assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(false));
}

@Test
public void testIsSSLEnabledOnlyRestSSL() {
final Configuration configuration = new Configuration();
configuration.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);

assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(false));
}

@Test
public void testIsSSLEnabled() {
final Configuration configuration = new Configuration();
enableSSL(configuration);

assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(true));
}

private void enableSSL(Configuration configuration) {
configuration.setBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED, true);
configuration.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
}

@Test
public void testGetHistoryServerURL() throws MalformedURLException {
final Configuration configuration = createDefaultConfiguration();

final Optional<URL> historyServerURL = HistoryServerUtils.getHistoryServerURL(configuration);

assertThat(historyServerURL.get(), is(new URL("http", HOSTNAME, PORT, "")));
}

@Test
public void testGetHistoryServerURLWithSSL() throws MalformedURLException {
final Configuration configuration = createDefaultConfiguration();
enableSSL(configuration);

final Optional<URL> historyServerURL = HistoryServerUtils.getHistoryServerURL(configuration);

assertThat(historyServerURL.get(), is(new URL("https", HOSTNAME, PORT, "")));
}

@Test
public void testGetHistoryServerURLWithoutHS() {
final Configuration configuration = new Configuration();

assertThat(HistoryServerUtils.getHistoryServerURL(configuration).isPresent(), is(false));
}

@Nonnull
private Configuration createDefaultConfiguration() {
final Configuration configuration = new Configuration();
configuration.setString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS, HOSTNAME);
configuration.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, PORT);
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
Expand All @@ -63,13 +64,15 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -292,9 +295,12 @@ protected void internalDeregisterApplication(
FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus);

String historyServerAddress = flinkConfig.getString(YarnConfigOptions.APPLICATION_HISTORY_SERVER_ADDRESS);
final Optional<URL> historyServerURL = HistoryServerUtils.getHistoryServerURL(flinkConfig);

final String appTrackingUrl = historyServerURL.map(URL::toString).orElse("");

try {
resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, historyServerAddress);
resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, appTrackingUrl);
} catch (Throwable t) {
log.error("Could not unregister the application master.", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,6 @@ public class YarnConfigOptions {
.defaultValue("")
.withDescription("A comma-separated list of tags to apply to the Flink YARN application.");

/**
* The address of Flink HistoryServer.
* This configuration parameter allows setting the appTrackingUrl for finished YARN applications.
*/
public static final ConfigOption<String> APPLICATION_HISTORY_SERVER_ADDRESS =
key("yarn.history.server.address")
.defaultValue("")
.withDescription("The address of Flink HistoryServer.");

// ------------------------------------------------------------------------

/** This class is not meant to be instantiated. */
Expand Down

0 comments on commit 792f2a2

Please sign in to comment.