Skip to content

Commit

Permalink
[FLINK-9313] [security] (part 2) Split SSL configuration into interna…
Browse files Browse the repository at this point in the history
…l (rpc, data transport, blob server) and external (REST)

This also uses SSLEngineFactory for all SSLEngine creations.
  • Loading branch information
StephanEwen authored and tillrohrmann committed Jul 16, 2018
1 parent 4db63c0 commit 30c4bc8
Show file tree
Hide file tree
Showing 27 changed files with 791 additions and 427 deletions.
9 changes: 7 additions & 2 deletions docs/_includes/generated/common_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,14 @@
<td>File system path (URI) where Flink persists metadata in high-availability setups.</td>
</tr>
<tr>
<td><h5>security.ssl.enabled</h5></td>
<td><h5>security.ssl.internal.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules.</td>
<td>Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).</td>
</tr>
<tr>
<td><h5>security.ssl.rest.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Turns on SSL for external communication via the REST endpoints.</td>
</tr>
</tbody>
</table>
59 changes: 57 additions & 2 deletions docs/_includes/generated/security_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,34 @@
<td>The comma separated list of standard SSL algorithms to be supported. Read more &#60;a href="http:https://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites"&#62;here&#60;/a&#62;.</td>
</tr>
<tr>
<td><h5>security.ssl.enabled</h5></td>
<td><h5>security.ssl.internal.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules.</td>
<td>Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).</td>
</tr>
<tr>
<td><h5>security.ssl.internal.key-password</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The secret to decrypt the key in the keystore for Flink's internal endpoints (rpc, data transport, blob server).</td>
</tr>
<tr>
<td><h5>security.ssl.internal.keystore</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The Java keystore file with SSL Key and Certificate, to be used Flink's internal endpoints (rpc, data transport, blob server).</td>
</tr>
<tr>
<td><h5>security.ssl.internal.keystore-password</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server).</td>
</tr>
<tr>
<td><h5>security.ssl.internal.truststore</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The truststore file containing the public CA certificates to verify the peer for Flink's internal endpoints (rpc, data transport, blob server).</td>
</tr>
<tr>
<td><h5>security.ssl.internal.truststore-password</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The password to decrypt the truststore for Flink's internal endpoints (rpc, data transport, blob server).</td>
</tr>
<tr>
<td><h5>security.ssl.key-password</h5></td>
Expand All @@ -37,6 +62,36 @@
<td style="word-wrap: break-word;">"TLSv1.2"</td>
<td>The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.</td>
</tr>
<tr>
<td><h5>security.ssl.rest.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Turns on SSL for external communication via the REST endpoints.</td>
</tr>
<tr>
<td><h5>security.ssl.rest.key-password</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The secret to decrypt the key in the keystore for Flink's external REST endpoints.</td>
</tr>
<tr>
<td><h5>security.ssl.rest.keystore</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The Java keystore file with SSL Key and Certificate, to be used Flink's external REST endpoints.</td>
</tr>
<tr>
<td><h5>security.ssl.rest.keystore-password</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The secret to decrypt the keystore file for Flink's for Flink's external REST endpoints.</td>
</tr>
<tr>
<td><h5>security.ssl.rest.truststore</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The truststore file containing the public CA certificates to verify the peer for Flink's external REST endpoints.</td>
</tr>
<tr>
<td><h5>security.ssl.rest.truststore-password</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The password to decrypt the truststore for Flink's external REST endpoints.</td>
</tr>
<tr>
<td><h5>security.ssl.truststore</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/_includes/generated/task_manager_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<tr>
<td><h5>taskmanager.data.ssl.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true</td>
<td>Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true</td>
</tr>
<tr>
<td><h5>taskmanager.debug.memory.log</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class HistoryServerOptions {

/**
* Enables/Disables SSL support for the HistoryServer web-frontend. Only relevant if
* {@link SecurityOptions#SSL_ENABLED} is enabled.
* {@link SecurityOptions#SSL_REST_ENABLED} is enabled.
*/
public static final ConfigOption<Boolean> HISTORY_SERVER_WEB_SSL_ENABLED =
key("historyserver.web.ssl.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,40 @@ public class SecurityOptions {
// ------------------------------------------------------------------------

/**
* Enable SSL support.
* Enable SSL for internal (rpc, data transport, blob server) and external (HTTP/REST) communication.
*
* @deprecated Use {@link #SSL_INTERNAL_ENABLED} and {@link #SSL_REST_ENABLED} instead.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
@Deprecated
public static final ConfigOption<Boolean> SSL_ENABLED =
key("security.ssl.enabled")
.defaultValue(false)
.withDescription("Turns on SSL for internal network communication. This can be optionally overridden by" +
" flags defined in different transport modules.");
.withDescription("Turns on SSL for internal and external network communication." +
"This can be overridden by 'security.ssl.internal.enabled', 'security.ssl.external.enabled'. " +
"Specific internal components (rpc, data transport, blob server) may optionally override " +
"this through their own settings.");

/**
* Enable SSL for internal communication (akka rpc, netty data transport, blob server).
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
public static final ConfigOption<Boolean> SSL_INTERNAL_ENABLED =
key("security.ssl.internal.enabled")
.defaultValue(false)
.withDescription("Turns on SSL for internal network communication. " +
"Optionally, specific components may override this through their own settings " +
"(rpc, data transport, REST, etc).");

/**
* Enable SSL for external REST endpoints.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
public static final ConfigOption<Boolean> SSL_REST_ENABLED =
key("security.ssl.rest.enabled")
.defaultValue(false)
.withDescription("Turns on SSL for external communication via the REST endpoints.");

// ----------------- certificates (internal + external) -------------------

/**
* The Java keystore file containing the flink endpoint key and certificate.
Expand Down Expand Up @@ -135,6 +161,102 @@ public class SecurityOptions {
.noDefaultValue()
.withDescription("The secret to decrypt the truststore.");

// ----------------------- certificates (internal) ------------------------

/**
* For internal SSL, the Java keystore file containing the private key and certificate.
*/
public static final ConfigOption<String> SSL_INTERNAL_KEYSTORE =
key("security.ssl.internal.keystore")
.noDefaultValue()
.withDescription("The Java keystore file with SSL Key and Certificate, " +
"to be used Flink's internal endpoints (rpc, data transport, blob server).");

/**
* For internal SSL, the password to decrypt the keystore file containing the certificate.
*/
public static final ConfigOption<String> SSL_INTERNAL_KEYSTORE_PASSWORD =
key("security.ssl.internal.keystore-password")
.noDefaultValue()
.withDescription("The secret to decrypt the keystore file for Flink's " +
"for Flink's internal endpoints (rpc, data transport, blob server).");

/**
* For internal SSL, the password to decrypt the private key.
*/
public static final ConfigOption<String> SSL_INTERNAL_KEY_PASSWORD =
key("security.ssl.internal.key-password")
.noDefaultValue()
.withDescription("The secret to decrypt the key in the keystore " +
"for Flink's internal endpoints (rpc, data transport, blob server).");

/**
* For internal SSL, the truststore file containing the public CA certificates to verify the ssl peers.
*/
public static final ConfigOption<String> SSL_INTERNAL_TRUSTSTORE =
key("security.ssl.internal.truststore")
.noDefaultValue()
.withDescription("The truststore file containing the public CA certificates to verify the peer " +
"for Flink's internal endpoints (rpc, data transport, blob server).");

/**
* For internal SSL, the secret to decrypt the truststore.
*/
public static final ConfigOption<String> SSL_INTERNAL_TRUSTSTORE_PASSWORD =
key("security.ssl.internal.truststore-password")
.noDefaultValue()
.withDescription("The password to decrypt the truststore " +
"for Flink's internal endpoints (rpc, data transport, blob server).");

// ----------------------- certificates (external) ------------------------

/**
* For external (REST) SSL, the Java keystore file containing the private key and certificate.
*/
public static final ConfigOption<String> SSL_REST_KEYSTORE =
key("security.ssl.rest.keystore")
.noDefaultValue()
.withDescription("The Java keystore file with SSL Key and Certificate, " +
"to be used Flink's external REST endpoints.");

/**
* For external (REST) SSL, the password to decrypt the keystore file containing the certificate.
*/
public static final ConfigOption<String> SSL_REST_KEYSTORE_PASSWORD =
key("security.ssl.rest.keystore-password")
.noDefaultValue()
.withDescription("The secret to decrypt the keystore file for Flink's " +
"for Flink's external REST endpoints.");

/**
* For external (REST) SSL, the password to decrypt the private key.
*/
public static final ConfigOption<String> SSL_REST_KEY_PASSWORD =
key("security.ssl.rest.key-password")
.noDefaultValue()
.withDescription("The secret to decrypt the key in the keystore " +
"for Flink's external REST endpoints.");

/**
* For external (REST) SSL, the truststore file containing the public CA certificates to verify the ssl peers.
*/
public static final ConfigOption<String> SSL_REST_TRUSTSTORE =
key("security.ssl.rest.truststore")
.noDefaultValue()
.withDescription("The truststore file containing the public CA certificates to verify the peer " +
"for Flink's external REST endpoints.");

/**
* For external (REST) SSL, the secret to decrypt the truststore.
*/
public static final ConfigOption<String> SSL_REST_TRUSTSTORE_PASSWORD =
key("security.ssl.rest.truststore-password")
.noDefaultValue()
.withDescription("The password to decrypt the truststore " +
"for Flink's external REST endpoints.");

// ------------------------ ssl parameters --------------------------------

/**
* SSL protocol version to be supported.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public class TaskManagerOptions {
key("taskmanager.data.ssl.enabled")
.defaultValue(true)
.withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" +
" global ssl flag " + SecurityOptions.SSL_ENABLED.key() + " is set to true");
" global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");

/**
* The initial registration backoff between two consecutive registration attempts. The backoff
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.router.Router;
Expand Down Expand Up @@ -58,7 +59,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

import java.io.File;
Expand Down Expand Up @@ -104,28 +104,27 @@ public class MesosArtifactServer implements MesosArtifactResolver {

private final Map<Path, URL> paths = new HashMap<>();

private final SSLContext serverSSLContext;

public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration config)
throws Exception {
if (configuredPort < 0 || configuredPort > 0xFFFF) {
throw new IllegalArgumentException("File server port is invalid: " + configuredPort);
}

// Config to enable https access to the artifact server
boolean enableSSL = config.getBoolean(
final boolean enableSSL = config.getBoolean(
MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) &&
SSLUtils.getSSLEnabled(config);
SSLUtils.isRestSSLEnabled(config);

final SSLEngineFactory sslFactory;
if (enableSSL) {
LOG.info("Enabling ssl for the artifact server");
try {
serverSSLContext = SSLUtils.createSSLServerContext(config);
sslFactory = SSLUtils.createRestServerSSLEngineFactory(config);
} catch (Exception e) {
throw new IOException("Failed to initialize SSLContext for the artifact server", e);
}
} else {
serverSSLContext = null;
sslFactory = null;
}

router = new Router();
Expand All @@ -138,10 +137,8 @@ protected void initChannel(SocketChannel ch) {
RouterHandler handler = new RouterHandler(router, new HashMap<>());

// SSL should be the first handler in the pipeline
if (serverSSLContext != null) {
SSLEngine sslEngine = serverSSLContext.createSSLEngine();
SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
sslEngine.setUseClientMode(false);
if (sslFactory != null) {
SSLEngine sslEngine = sslFactory.createSSLEngine();
ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
}

Expand Down Expand Up @@ -169,7 +166,7 @@ protected void initChannel(SocketChannel ch) {
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();

String httpProtocol = (serverSSLContext != null) ? "https" : "http";
String httpProtocol = (sslFactory != null) ? "https" : "http";

baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.WebHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
Expand Down Expand Up @@ -92,8 +93,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
Expand Down Expand Up @@ -130,8 +129,6 @@ public class WebRuntimeMonitor implements WebMonitor {
/** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
private final LeaderGatewayRetriever<JobManagerGateway> retriever;

private final SSLContext serverSSLContext;

private final CompletableFuture<String> localRestAddress = new CompletableFuture<>();

private final Time timeout;
Expand Down Expand Up @@ -234,17 +231,17 @@ public WebRuntimeMonitor(
// --------------------------------------------------------------------

// Config to enable https access to the web-ui
boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config);

final SSLEngineFactory sslFactory;
final boolean enableSSL = SSLUtils.isRestSSLEnabled(config) && config.getBoolean(WebOptions.SSL_ENABLED);
if (enableSSL) {
LOG.info("Enabling ssl for the web frontend");
try {
serverSSLContext = SSLUtils.createSSLServerContext(config);
sslFactory = SSLUtils.createRestServerSSLEngineFactory(config);
} catch (Exception e) {
throw new IOException("Failed to initialize SSLContext for the web frontend", e);
}
} else {
serverSSLContext = null;
sslFactory = null;
}
metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout);

Expand Down Expand Up @@ -385,7 +382,7 @@ public WebRuntimeMonitor(
// add shutdown hook for deleting the directories and remaining temp files on shutdown
ShutdownHookUtil.addShutdownHook(this::cleanup, getClass().getSimpleName(), LOG);

this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, serverSSLContext, configuredAddress, configuredPort, config);
this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, sslFactory, configuredAddress, configuredPort, config);

localRestAddress.complete(netty.getRestAddress());
}
Expand Down
Loading

0 comments on commit 30c4bc8

Please sign in to comment.