From 30c4bc847ade8cf0ae5c3ef6a6f8debdf72ddd61 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 12 Jul 2018 17:20:30 +0200 Subject: [PATCH] [FLINK-9313] [security] (part 2) Split SSL configuration into internal (rpc, data transport, blob server) and external (REST) This also uses SSLEngineFactory for all SSLEngine creations. --- docs/_includes/generated/common_section.html | 9 +- .../generated/security_configuration.html | 59 ++- .../generated/task_manager_configuration.html | 2 +- .../configuration/HistoryServerOptions.java | 2 +- .../flink/configuration/SecurityOptions.java | 130 +++++- .../configuration/TaskManagerOptions.java | 2 +- .../flink/mesos/util/MesosArtifactServer.java | 21 +- .../runtime/webmonitor/WebRuntimeMonitor.java | 15 +- .../webmonitor/history/HistoryServer.java | 13 +- .../utils/WebFrontendBootstrap.java | 17 +- .../apache/flink/runtime/blob/BlobClient.java | 3 +- .../apache/flink/runtime/blob/BlobServer.java | 3 +- .../HighAvailabilityServicesUtils.java | 4 +- .../runtime/io/network/netty/NettyClient.java | 25 +- .../runtime/io/network/netty/NettyConfig.java | 51 +-- .../runtime/io/network/netty/NettyServer.java | 13 +- .../flink/runtime/net/SSLEngineFactory.java | 12 +- .../apache/flink/runtime/net/SSLUtils.java | 298 ++++++------ .../runtime/rest/RestClientConfiguration.java | 6 +- .../rest/RestServerEndpointConfiguration.java | 6 +- .../runtime/rpc/akka/AkkaRpcServiceUtils.java | 2 +- .../apache/flink/runtime/akka/AkkaUtils.scala | 22 +- .../flink/runtime/blob/BlobClientSslTest.java | 33 +- .../netty/NettyClientServerSslTest.java | 15 +- .../flink/runtime/net/SSLUtilsTest.java | 427 ++++++++++++------ .../rest/RestServerEndpointITCase.java | 18 +- .../flink/runtime/akka/AkkaSslITCase.scala | 10 +- 27 files changed, 791 insertions(+), 427 deletions(-) diff --git a/docs/_includes/generated/common_section.html b/docs/_includes/generated/common_section.html index 29245c9c6602f..ea881e0e34725 100644 --- a/docs/_includes/generated/common_section.html +++ b/docs/_includes/generated/common_section.html @@ -53,9 +53,14 @@ File system path (URI) where Flink persists metadata in high-availability setups. -
security.ssl.enabled
+
security.ssl.internal.enabled
false - Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules. + Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc). + + +
security.ssl.rest.enabled
+ false + Turns on SSL for external communication via the REST endpoints. diff --git a/docs/_includes/generated/security_configuration.html b/docs/_includes/generated/security_configuration.html index cd682ecaf0f7e..5042cf3df01c1 100644 --- a/docs/_includes/generated/security_configuration.html +++ b/docs/_includes/generated/security_configuration.html @@ -13,9 +13,34 @@ The comma separated list of standard SSL algorithms to be supported. Read more <a href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites">here</a>. -
security.ssl.enabled
+
security.ssl.internal.enabled
false - Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules. + Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc). + + +
security.ssl.internal.key-password
+ (none) + The secret to decrypt the key in the keystore for Flink's internal endpoints (rpc, data transport, blob server). + + +
security.ssl.internal.keystore
+ (none) + The Java keystore file with SSL Key and Certificate, to be used Flink's internal endpoints (rpc, data transport, blob server). + + +
security.ssl.internal.keystore-password
+ (none) + The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server). + + +
security.ssl.internal.truststore
+ (none) + The truststore file containing the public CA certificates to verify the peer for Flink's internal endpoints (rpc, data transport, blob server). + + +
security.ssl.internal.truststore-password
+ (none) + The password to decrypt the truststore for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.key-password
@@ -37,6 +62,36 @@ "TLSv1.2" The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list. + +
security.ssl.rest.enabled
+ false + Turns on SSL for external communication via the REST endpoints. + + +
security.ssl.rest.key-password
+ (none) + The secret to decrypt the key in the keystore for Flink's external REST endpoints. + + +
security.ssl.rest.keystore
+ (none) + The Java keystore file with SSL Key and Certificate, to be used Flink's external REST endpoints. + + +
security.ssl.rest.keystore-password
+ (none) + The secret to decrypt the keystore file for Flink's for Flink's external REST endpoints. + + +
security.ssl.rest.truststore
+ (none) + The truststore file containing the public CA certificates to verify the peer for Flink's external REST endpoints. + + +
security.ssl.rest.truststore-password
+ (none) + The password to decrypt the truststore for Flink's external REST endpoints. +
security.ssl.truststore
(none) diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html index e780f6e019edf..c18c5d4c36d92 100644 --- a/docs/_includes/generated/task_manager_configuration.html +++ b/docs/_includes/generated/task_manager_configuration.html @@ -35,7 +35,7 @@
taskmanager.data.ssl.enabled
true - Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true + Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true
taskmanager.debug.memory.log
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java index 13cdc1e3c8a64..12e7f24697e25 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java @@ -80,7 +80,7 @@ public class HistoryServerOptions { /** * Enables/Disables SSL support for the HistoryServer web-frontend. Only relevant if - * {@link SecurityOptions#SSL_ENABLED} is enabled. + * {@link SecurityOptions#SSL_REST_ENABLED} is enabled. */ public static final ConfigOption HISTORY_SERVER_WEB_SSL_ENABLED = key("historyserver.web.ssl.enabled") diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index fc7e39159a64d..03ee4f2db6033 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -85,14 +85,40 @@ public class SecurityOptions { // ------------------------------------------------------------------------ /** - * Enable SSL support. + * Enable SSL for internal (rpc, data transport, blob server) and external (HTTP/REST) communication. + * + * @deprecated Use {@link #SSL_INTERNAL_ENABLED} and {@link #SSL_REST_ENABLED} instead. */ - @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY) + @Deprecated public static final ConfigOption SSL_ENABLED = key("security.ssl.enabled") .defaultValue(false) - .withDescription("Turns on SSL for internal network communication. This can be optionally overridden by" + - " flags defined in different transport modules."); + .withDescription("Turns on SSL for internal and external network communication." + + "This can be overridden by 'security.ssl.internal.enabled', 'security.ssl.external.enabled'. " + + "Specific internal components (rpc, data transport, blob server) may optionally override " + + "this through their own settings."); + + /** + * Enable SSL for internal communication (akka rpc, netty data transport, blob server). + */ + @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY) + public static final ConfigOption SSL_INTERNAL_ENABLED = + key("security.ssl.internal.enabled") + .defaultValue(false) + .withDescription("Turns on SSL for internal network communication. " + + "Optionally, specific components may override this through their own settings " + + "(rpc, data transport, REST, etc)."); + + /** + * Enable SSL for external REST endpoints. + */ + @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY) + public static final ConfigOption SSL_REST_ENABLED = + key("security.ssl.rest.enabled") + .defaultValue(false) + .withDescription("Turns on SSL for external communication via the REST endpoints."); + + // ----------------- certificates (internal + external) ------------------- /** * The Java keystore file containing the flink endpoint key and certificate. @@ -135,6 +161,102 @@ public class SecurityOptions { .noDefaultValue() .withDescription("The secret to decrypt the truststore."); + // ----------------------- certificates (internal) ------------------------ + + /** + * For internal SSL, the Java keystore file containing the private key and certificate. + */ + public static final ConfigOption SSL_INTERNAL_KEYSTORE = + key("security.ssl.internal.keystore") + .noDefaultValue() + .withDescription("The Java keystore file with SSL Key and Certificate, " + + "to be used Flink's internal endpoints (rpc, data transport, blob server)."); + + /** + * For internal SSL, the password to decrypt the keystore file containing the certificate. + */ + public static final ConfigOption SSL_INTERNAL_KEYSTORE_PASSWORD = + key("security.ssl.internal.keystore-password") + .noDefaultValue() + .withDescription("The secret to decrypt the keystore file for Flink's " + + "for Flink's internal endpoints (rpc, data transport, blob server)."); + + /** + * For internal SSL, the password to decrypt the private key. + */ + public static final ConfigOption SSL_INTERNAL_KEY_PASSWORD = + key("security.ssl.internal.key-password") + .noDefaultValue() + .withDescription("The secret to decrypt the key in the keystore " + + "for Flink's internal endpoints (rpc, data transport, blob server)."); + + /** + * For internal SSL, the truststore file containing the public CA certificates to verify the ssl peers. + */ + public static final ConfigOption SSL_INTERNAL_TRUSTSTORE = + key("security.ssl.internal.truststore") + .noDefaultValue() + .withDescription("The truststore file containing the public CA certificates to verify the peer " + + "for Flink's internal endpoints (rpc, data transport, blob server)."); + + /** + * For internal SSL, the secret to decrypt the truststore. + */ + public static final ConfigOption SSL_INTERNAL_TRUSTSTORE_PASSWORD = + key("security.ssl.internal.truststore-password") + .noDefaultValue() + .withDescription("The password to decrypt the truststore " + + "for Flink's internal endpoints (rpc, data transport, blob server)."); + + // ----------------------- certificates (external) ------------------------ + + /** + * For external (REST) SSL, the Java keystore file containing the private key and certificate. + */ + public static final ConfigOption SSL_REST_KEYSTORE = + key("security.ssl.rest.keystore") + .noDefaultValue() + .withDescription("The Java keystore file with SSL Key and Certificate, " + + "to be used Flink's external REST endpoints."); + + /** + * For external (REST) SSL, the password to decrypt the keystore file containing the certificate. + */ + public static final ConfigOption SSL_REST_KEYSTORE_PASSWORD = + key("security.ssl.rest.keystore-password") + .noDefaultValue() + .withDescription("The secret to decrypt the keystore file for Flink's " + + "for Flink's external REST endpoints."); + + /** + * For external (REST) SSL, the password to decrypt the private key. + */ + public static final ConfigOption SSL_REST_KEY_PASSWORD = + key("security.ssl.rest.key-password") + .noDefaultValue() + .withDescription("The secret to decrypt the key in the keystore " + + "for Flink's external REST endpoints."); + + /** + * For external (REST) SSL, the truststore file containing the public CA certificates to verify the ssl peers. + */ + public static final ConfigOption SSL_REST_TRUSTSTORE = + key("security.ssl.rest.truststore") + .noDefaultValue() + .withDescription("The truststore file containing the public CA certificates to verify the peer " + + "for Flink's external REST endpoints."); + + /** + * For external (REST) SSL, the secret to decrypt the truststore. + */ + public static final ConfigOption SSL_REST_TRUSTSTORE_PASSWORD = + key("security.ssl.rest.truststore-password") + .noDefaultValue() + .withDescription("The password to decrypt the truststore " + + "for Flink's external REST endpoints."); + + // ------------------------ ssl parameters -------------------------------- + /** * SSL protocol version to be supported. */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index 2907f6b153c0d..be76cf8c4dbef 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -116,7 +116,7 @@ public class TaskManagerOptions { key("taskmanager.data.ssl.enabled") .defaultValue(true) .withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" + - " global ssl flag " + SecurityOptions.SSL_ENABLED.key() + " is set to true"); + " global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true"); /** * The initial registration backoff between two consecutive registration attempts. The backoff diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java index d0d41e29a7d1b..1fa2cd0ec5cf7 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java @@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.mesos.configuration.MesosOptions; +import org.apache.flink.runtime.net.SSLEngineFactory; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.rest.handler.router.RoutedRequest; import org.apache.flink.runtime.rest.handler.router.Router; @@ -58,7 +59,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import java.io.File; @@ -104,8 +104,6 @@ public class MesosArtifactServer implements MesosArtifactResolver { private final Map paths = new HashMap<>(); - private final SSLContext serverSSLContext; - public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration config) throws Exception { if (configuredPort < 0 || configuredPort > 0xFFFF) { @@ -113,19 +111,20 @@ public MesosArtifactServer(String prefix, String serverHostname, int configuredP } // Config to enable https access to the artifact server - boolean enableSSL = config.getBoolean( + final boolean enableSSL = config.getBoolean( MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) && - SSLUtils.getSSLEnabled(config); + SSLUtils.isRestSSLEnabled(config); + final SSLEngineFactory sslFactory; if (enableSSL) { LOG.info("Enabling ssl for the artifact server"); try { - serverSSLContext = SSLUtils.createSSLServerContext(config); + sslFactory = SSLUtils.createRestServerSSLEngineFactory(config); } catch (Exception e) { throw new IOException("Failed to initialize SSLContext for the artifact server", e); } } else { - serverSSLContext = null; + sslFactory = null; } router = new Router(); @@ -138,10 +137,8 @@ protected void initChannel(SocketChannel ch) { RouterHandler handler = new RouterHandler(router, new HashMap<>()); // SSL should be the first handler in the pipeline - if (serverSSLContext != null) { - SSLEngine sslEngine = serverSSLContext.createSSLEngine(); - SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig); - sslEngine.setUseClientMode(false); + if (sslFactory != null) { + SSLEngine sslEngine = sslFactory.createSSLEngine(); ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); } @@ -169,7 +166,7 @@ protected void initChannel(SocketChannel ch) { String address = bindAddress.getAddress().getHostAddress(); int port = bindAddress.getPort(); - String httpProtocol = (serverSSLContext != null) ? "https" : "http"; + String httpProtocol = (sslFactory != null) ? "https" : "http"; baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/"); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 976b08021d578..39c8a3c0d033b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.net.SSLEngineFactory; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.rest.handler.WebHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; @@ -92,8 +93,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; - import java.io.File; import java.io.IOException; import java.util.UUID; @@ -130,8 +129,6 @@ public class WebRuntimeMonitor implements WebMonitor { /** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */ private final LeaderGatewayRetriever retriever; - private final SSLContext serverSSLContext; - private final CompletableFuture localRestAddress = new CompletableFuture<>(); private final Time timeout; @@ -234,17 +231,17 @@ public WebRuntimeMonitor( // -------------------------------------------------------------------- // Config to enable https access to the web-ui - boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config); - + final SSLEngineFactory sslFactory; + final boolean enableSSL = SSLUtils.isRestSSLEnabled(config) && config.getBoolean(WebOptions.SSL_ENABLED); if (enableSSL) { LOG.info("Enabling ssl for the web frontend"); try { - serverSSLContext = SSLUtils.createSSLServerContext(config); + sslFactory = SSLUtils.createRestServerSSLEngineFactory(config); } catch (Exception e) { throw new IOException("Failed to initialize SSLContext for the web frontend", e); } } else { - serverSSLContext = null; + sslFactory = null; } metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout); @@ -385,7 +382,7 @@ public WebRuntimeMonitor( // add shutdown hook for deleting the directories and remaining temp files on shutdown ShutdownHookUtil.addShutdownHook(this::cleanup, getClass().getSimpleName(), LOG); - this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, serverSSLContext, configuredAddress, configuredPort, config); + this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, sslFactory, configuredAddress, configuredPort, config); localRestAddress.complete(netty.getRestAddress()); } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 7e7e92194672f..0891426559153 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -26,6 +26,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.net.SSLEngineFactory; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; import org.apache.flink.runtime.rest.handler.router.Router; @@ -42,8 +43,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; - import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -89,7 +88,7 @@ public class HistoryServer { private final HistoryServerArchiveFetcher archiveFetcher; - private final SSLContext serverSSLContext; + private final SSLEngineFactory serverSSLFactory; private WebFrontendBootstrap netty; private final Object startupShutdownLock = new Object(); @@ -143,15 +142,15 @@ public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) thro Preconditions.checkNotNull(numFinishedPolls); this.config = config; - if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) { LOG.info("Enabling SSL for the history server."); try { - this.serverSSLContext = SSLUtils.createSSLServerContext(config); + this.serverSSLFactory = SSLUtils.createRestServerSSLEngineFactory(config); } catch (Exception e) { throw new IOException("Failed to initialize SSLContext for the history server.", e); } } else { - this.serverSSLContext = null; + this.serverSSLFactory = null; } webAddress = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS); @@ -231,7 +230,7 @@ void start() throws IOException, InterruptedException { archiveFetcher.start(); - netty = new WebFrontendBootstrap(router, LOG, webDir, serverSSLContext, webAddress, webPort, config); + netty = new WebFrontendBootstrap(router, LOG, webDir, serverSSLFactory, webAddress, webPort, config); } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java index bec9ea2df4339..672fddb3eec98 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java @@ -20,7 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.net.SSLEngineFactory; import org.apache.flink.runtime.rest.handler.router.Router; import org.apache.flink.runtime.rest.handler.router.RouterHandler; import org.apache.flink.runtime.webmonitor.HttpRequestHandler; @@ -40,7 +40,7 @@ import org.slf4j.Logger; -import javax.net.ssl.SSLContext; +import javax.annotation.Nullable; import javax.net.ssl.SSLEngine; import java.io.File; @@ -56,7 +56,6 @@ public class WebFrontendBootstrap { private final Router router; private final Logger log; private final File uploadDir; - private final SSLContext serverSSLContext; private final ServerBootstrap bootstrap; private final Channel serverChannel; private final String restAddress; @@ -65,14 +64,14 @@ public WebFrontendBootstrap( Router router, Logger log, File directory, - SSLContext sslContext, + @Nullable SSLEngineFactory serverSSLFactory, String configuredAddress, int configuredPort, final Configuration config) throws InterruptedException, UnknownHostException { + this.router = Preconditions.checkNotNull(router); this.log = Preconditions.checkNotNull(log); this.uploadDir = directory; - this.serverSSLContext = sslContext; ChannelInitializer initializer = new ChannelInitializer() { @@ -81,10 +80,8 @@ protected void initChannel(SocketChannel ch) { RouterHandler handler = new RouterHandler(WebFrontendBootstrap.this.router, new HashMap<>()); // SSL should be the first handler in the pipeline - if (serverSSLContext != null) { - SSLEngine sslEngine = serverSSLContext.createSSLEngine(); - SSLUtils.setSSLVerAndCipherSuites(sslEngine, config); - sslEngine.setUseClientMode(false); + if (serverSSLFactory != null) { + SSLEngine sslEngine = serverSSLFactory.createSSLEngine(); ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); } @@ -129,7 +126,7 @@ protected void initChannel(SocketChannel ch) { this.log.info("Web frontend listening at {}" + ':' + "{}", address, port); - final String protocol = serverSSLContext != null ? "https://" : "http://"; + final String protocol = serverSSLFactory != null ? "https://" : "http://"; this.restAddress = protocol + address + ':' + port; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index 2ca250c89f965..01e307eb5b1c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.net.SSLUtils; @@ -84,7 +83,7 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) t try { // create an SSL socket if configured - if (clientConfig.getBoolean(SecurityOptions.SSL_ENABLED) && clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) { + if (SSLUtils.isInternalSSLEnabled(clientConfig) && clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) { LOG.info("Using ssl connection to the blob server"); socket = SSLUtils.createSSLClientSocketFactory(clientConfig).createSocket( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index ee1d50abe4d80..206be0e05a444 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -23,7 +23,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; @@ -176,7 +175,7 @@ public BlobServer(Configuration config, BlobStore blobStore) throws IOException final Iterator ports = NetUtils.getPortRangeFromString(serverPortRange); final ServerSocketFactory socketFactory; - if (config.getBoolean(SecurityOptions.SSL_ENABLED) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) { + if (SSLUtils.isInternalSSLEnabled(config) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) { try { socketFactory = SSLUtils.createSSLServerSocketFactory(config); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index 918f1f0324845..778d2dbd65d10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.blob.BlobStoreService; import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.dispatcher.Dispatcher; @@ -31,6 +30,7 @@ import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils; @@ -103,7 +103,7 @@ public static HighAvailabilityServices createHighAvailabilityServices( "%s must be set", RestOptions.ADDRESS.key()); final int port = configuration.getInteger(RestOptions.PORT); - final boolean enableSSL = configuration.getBoolean(SecurityOptions.SSL_ENABLED); + final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration); final String protocol = enableSSL ? "https://" : "http://"; return new StandaloneHaServices( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java index dd4019032000a..ab999d4de8d10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.io.network.netty; +import org.apache.flink.runtime.net.SSLEngineFactory; + import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; @@ -34,9 +36,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; +import javax.annotation.Nullable; import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; + import java.io.IOException; import java.net.InetSocketAddress; @@ -52,7 +54,8 @@ class NettyClient { private Bootstrap bootstrap; - private SSLContext clientSSLContext = null; + @Nullable + private SSLEngineFactory clientSSLFactory; NettyClient(NettyConfig config) { this.config = config; @@ -112,7 +115,7 @@ void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws } try { - clientSSLContext = config.createClientSSLContext(); + clientSSLFactory = config.createClientSSLEngineFactory(); } catch (Exception e) { throw new IOException("Failed to initialize SSL Context for the Netty client", e); } @@ -177,18 +180,10 @@ ChannelFuture connect(final InetSocketAddress serverSocketAddress) { public void initChannel(SocketChannel channel) throws Exception { // SSL handler should be added first in the pipeline - if (clientSSLContext != null) { - SSLEngine sslEngine = clientSSLContext.createSSLEngine( + if (clientSSLFactory != null) { + SSLEngine sslEngine = clientSSLFactory.createSSLEngine( serverSocketAddress.getAddress().getCanonicalHostName(), serverSocketAddress.getPort()); - sslEngine.setUseClientMode(true); - - // Enable hostname verification for remote SSL connections - if (!serverSocketAddress.getAddress().isLoopbackAddress()) { - SSLParameters newSSLParameters = sslEngine.getSSLParameters(); - config.setSSLVerifyHostname(newSSLParameters); - sslEngine.setSSLParameters(newSSLParameters); - } channel.pipeline().addLast("ssl", new SslHandler(sslEngine)); } @@ -200,7 +195,7 @@ public void initChannel(SocketChannel channel) throws Exception { return bootstrap.connect(serverSocketAddress); } catch (ChannelException e) { - if ( (e.getCause() instanceof java.net.SocketException && + if ((e.getCause() instanceof java.net.SocketException && e.getCause().getMessage().equals("Too many open files")) || (e.getCause() instanceof ChannelException && e.getCause().getCause() instanceof java.net.SocketException && diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java index 18527c46a5347..46cdaabfdc0de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java @@ -22,13 +22,14 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.net.SSLEngineFactory; import org.apache.flink.runtime.net.SSLUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; +import javax.annotation.Nullable; + import java.net.InetAddress; import static org.apache.flink.util.Preconditions.checkArgument; @@ -89,9 +90,9 @@ enum TransportType { NIO, EPOLL, AUTO } - final static String SERVER_THREAD_GROUP_NAME = "Flink Netty Server"; + static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server"; - final static String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client"; + static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client"; private final InetAddress serverAddress; @@ -189,39 +190,23 @@ public TransportType getTransportType() { } } - public SSLContext createClientSSLContext() throws Exception { - - // Create SSL Context from config - SSLContext clientSSLContext = null; - if (getSSLEnabled()) { - clientSSLContext = SSLUtils.createSSLClientContext(config); - } - - return clientSSLContext; + @Nullable + public SSLEngineFactory createClientSSLEngineFactory() throws Exception { + return getSSLEnabled() ? + SSLUtils.createInternalClientSSLEngineFactory(config) : + null; } - public SSLContext createServerSSLContext() throws Exception { - - // Create SSL Context from config - SSLContext serverSSLContext = null; - if (getSSLEnabled()) { - serverSSLContext = SSLUtils.createSSLServerContext(config); - } - - return serverSSLContext; + @Nullable + public SSLEngineFactory createServerSSLEngineFactory() throws Exception { + return getSSLEnabled() ? + SSLUtils.createInternalServerSSLEngineFactory(config) : + null; } public boolean getSSLEnabled() { return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED) - && SSLUtils.getSSLEnabled(config); - } - - public void setSSLVerAndCipherSuites(SSLEngine engine) { - SSLUtils.setSSLVerAndCipherSuites(engine, config); - } - - public void setSSLVerifyHostname(SSLParameters sslParams) { - SSLUtils.setSSLVerifyHostname(config, sslParams); + && SSLUtils.isInternalSSLEnabled(config); } public boolean isCreditBasedEnabled() { @@ -245,7 +230,7 @@ public String toString() { String def = "use Netty's default"; String man = "manual"; - return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true":"false", + return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false", memorySegmentSize, getTransportType(), getServerNumThreads(), getServerNumThreads() == 0 ? def : man, getClientNumThreads(), getClientNumThreads() == 0 ? def : man, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java index 81bc50d4ea83a..cc260c6ed9bfe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.netty; +import org.apache.flink.runtime.net.SSLEngineFactory; import org.apache.flink.runtime.util.FatalExitExceptionHandler; import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -36,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import java.io.IOException; @@ -61,8 +61,6 @@ class NettyServer { private ChannelFuture bindFuture; - private SSLContext serverSSLContext = null; - private InetSocketAddress localAddress; NettyServer(NettyConfig config) { @@ -138,8 +136,9 @@ void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws } // SSL related configuration + final SSLEngineFactory sslEngineFactory; try { - serverSSLContext = config.createServerSSLContext(); + sslEngineFactory = config.createServerSSLEngineFactory(); } catch (Exception e) { throw new IOException("Failed to initialize SSL Context for the Netty Server", e); } @@ -151,10 +150,8 @@ void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws bootstrap.childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel channel) throws Exception { - if (serverSSLContext != null) { - SSLEngine sslEngine = serverSSLContext.createSSLEngine(); - config.setSSLVerAndCipherSuites(sslEngine); - sslEngine.setUseClientMode(false); + if (sslEngineFactory != null) { + SSLEngine sslEngine = sslEngineFactory.createSSLEngine(); channel.pipeline().addLast("ssl", new SslHandler(sslEngine)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java index 7aca60c46f5cf..d842267978c32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java @@ -49,9 +49,19 @@ public SSLEngineFactory( public SSLEngine createSSLEngine() { final SSLEngine sslEngine = sslContext.createSSLEngine(); + configureSSLEngine(sslEngine); + return sslEngine; + } + + public SSLEngine createSSLEngine(String hostname, int port) { + final SSLEngine sslEngine = sslContext.createSSLEngine(hostname, port); + configureSSLEngine(sslEngine); + return sslEngine; + } + + private void configureSSLEngine(SSLEngine sslEngine) { sslEngine.setEnabledProtocols(enabledProtocols); sslEngine.setEnabledCipherSuites(enabledCipherSuites); sslEngine.setUseClientMode(clientMode); - return sslEngine; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index 2bfc0d6004a84..5c9553569efff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -18,58 +18,59 @@ package org.apache.flink.runtime.net; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.SecurityOptions; -import org.apache.flink.util.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import javax.net.ServerSocketFactory; import javax.net.SocketFactory; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLServerSocket; import javax.net.ssl.SSLServerSocketFactory; import javax.net.ssl.TrustManagerFactory; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.InetAddress; import java.net.ServerSocket; +import java.nio.file.Files; import java.security.KeyStore; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * Common utilities to manage SSL transport settings. */ public class SSLUtils { - private static final Logger LOG = LoggerFactory.getLogger(SSLUtils.class); + /** + * Checks whether SSL for internal communication (rpc, data transport, blob server) is enabled. + */ + public static boolean isInternalSSLEnabled(Configuration sslConfig) { + @SuppressWarnings("deprecation") + final boolean fallbackFlag = sslConfig.getBoolean(SecurityOptions.SSL_ENABLED); + return sslConfig.getBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, fallbackFlag); + } /** - * Retrieves the global ssl flag from configuration. - * - * @param sslConfig - * The application configuration - * @return true if global ssl flag is set + * Checks whether SSL for the external REST endpoint is enabled. */ - public static boolean getSSLEnabled(Configuration sslConfig) { - return sslConfig.getBoolean(SecurityOptions.SSL_ENABLED); + public static boolean isRestSSLEnabled(Configuration sslConfig) { + @SuppressWarnings("deprecation") + final boolean fallbackFlag = sslConfig.getBoolean(SecurityOptions.SSL_ENABLED); + return sslConfig.getBoolean(SecurityOptions.SSL_REST_ENABLED, fallbackFlag); } /** * Creates a factory for SSL Server Sockets from the given configuration. + * SSL Server Sockets are always part of internal communication. */ public static ServerSocketFactory createSSLServerSocketFactory(Configuration config) throws Exception { - SSLContext sslContext = createSSLServerContext(config); + SSLContext sslContext = createInternalSSLContext(config); if (sslContext == null) { throw new IllegalConfigurationException("SSL is not enabled"); } @@ -83,9 +84,10 @@ public static ServerSocketFactory createSSLServerSocketFactory(Configuration con /** * Creates a factory for SSL Client Sockets from the given configuration. + * SSL Client Sockets are always part of internal communication. */ public static SocketFactory createSSLClientSocketFactory(Configuration config) throws Exception { - SSLContext sslContext = createSSLServerContext(config); + SSLContext sslContext = createInternalSSLContext(config); if (sslContext == null) { throw new IllegalConfigurationException("SSL is not enabled"); } @@ -94,51 +96,71 @@ public static SocketFactory createSSLClientSocketFactory(Configuration config) t } /** - * Creates a {@link SSLEngineFactory} to be used by the Server. - * - * @param config The application configuration. + * Creates a SSLEngineFactory to be used by internal communication server endpoints. */ - public static SSLEngineFactory createServerSSLEngineFactory(final Configuration config) throws Exception { - return createSSLEngineFactory(config, false); + public static SSLEngineFactory createInternalServerSSLEngineFactory(final Configuration config) throws Exception { + SSLContext sslContext = createInternalSSLContext(config); + if (sslContext == null) { + throw new IllegalConfigurationException("SSL is not enabled for internal communication."); + } + + return new SSLEngineFactory( + sslContext, + getEnabledProtocols(config), + getEnabledCipherSuites(config), + false); } /** - * Creates a {@link SSLEngineFactory} to be used by the Client. - * @param config The application configuration. + * Creates a SSLEngineFactory to be used by internal communication client endpoints. */ - public static SSLEngineFactory createClientSSLEngineFactory(final Configuration config) throws Exception { - return createSSLEngineFactory(config, true); - } - - private static SSLEngineFactory createSSLEngineFactory( - final Configuration config, - final boolean clientMode) throws Exception { + public static SSLEngineFactory createInternalClientSSLEngineFactory(final Configuration config) throws Exception { + SSLContext sslContext = createInternalSSLContext(config); + if (sslContext == null) { + throw new IllegalConfigurationException("SSL is not enabled for internal communication."); + } - final SSLContext sslContext = clientMode ? - createSSLClientContext(config) : - createSSLServerContext(config); + return new SSLEngineFactory( + sslContext, + getEnabledProtocols(config), + getEnabledCipherSuites(config), + true); + } - checkState(sslContext != null, "%s it not enabled", SecurityOptions.SSL_ENABLED.key()); + /** + * Creates a {@link SSLEngineFactory} to be used by the REST Servers. + * + * @param config The application configuration. + */ + public static SSLEngineFactory createRestServerSSLEngineFactory(final Configuration config) throws Exception { + SSLContext sslContext = createRestServerSSLContext(config); + if (sslContext == null) { + throw new IllegalConfigurationException("SSL is not enabled for REST endpoints."); + } return new SSLEngineFactory( - sslContext, - getEnabledProtocols(config), - getEnabledCipherSuites(config), - clientMode); + sslContext, + getEnabledProtocols(config), + getEnabledCipherSuites(config), + false); } /** - * Sets SSL version and cipher suites for SSLEngine. + * Creates a {@link SSLEngineFactory} to be used by the REST Clients. * - * @param engine SSLEngine to be handled - * @param config The application configuration - * @deprecated Use {@link #createClientSSLEngineFactory(Configuration)} or - * {@link #createServerSSLEngineFactory(Configuration)}. + * @param config The application configuration. */ - @Deprecated - public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) { - engine.setEnabledProtocols(getEnabledProtocols(config)); - engine.setEnabledCipherSuites(getEnabledCipherSuites(config)); + public static SSLEngineFactory createRestClientSSLEngineFactory(final Configuration config) throws Exception { + SSLContext sslContext = createRestClientSSLContext(config); + if (sslContext == null) { + throw new IllegalConfigurationException("SSL is not enabled for REST endpoints."); + } + + return new SSLEngineFactory( + sslContext, + getEnabledProtocols(config), + getEnabledCipherSuites(config), + true); } private static String[] getEnabledProtocols(final Configuration config) { @@ -152,120 +174,138 @@ private static String[] getEnabledCipherSuites(final Configuration config) { } /** - * Sets SSL options to verify peer's hostname in the certificate. - * - * @param sslConfig - * The application configuration - * @param sslParams - * The SSL parameters that need to be updated + * Creates the SSL Context for internal SSL, if internal SSL is configured. + * For internal SSL, the client and server side configuration are identical, because + * of mutual authentication. */ - public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters sslParams) { + @Nullable + public static SSLContext createInternalSSLContext(Configuration config) throws Exception { + checkNotNull(config, "config"); + + if (!isInternalSSLEnabled(config)) { + return null; + } + String keystoreFilePath = getAndCheckOption( + config, SecurityOptions.SSL_INTERNAL_KEYSTORE, SecurityOptions.SSL_KEYSTORE); + + String keystorePassword = getAndCheckOption( + config, SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, SecurityOptions.SSL_KEYSTORE_PASSWORD); + + String certPassword = getAndCheckOption( + config, SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, SecurityOptions.SSL_KEY_PASSWORD); - Preconditions.checkNotNull(sslConfig); - Preconditions.checkNotNull(sslParams); + String trustStoreFilePath = getAndCheckOption( + config, SecurityOptions.SSL_INTERNAL_TRUSTSTORE, SecurityOptions.SSL_TRUSTSTORE); - boolean verifyHostname = sslConfig.getBoolean(SecurityOptions.SSL_VERIFY_HOSTNAME); - if (verifyHostname) { - sslParams.setEndpointIdentificationAlgorithm("HTTPS"); + String trustStorePassword = getAndCheckOption( + config, SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, SecurityOptions.SSL_TRUSTSTORE_PASSWORD); + + String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL); + + KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + try (InputStream keyStoreFile = Files.newInputStream(new File(keystoreFilePath).toPath())) { + keyStore.load(keyStoreFile, keystorePassword.toCharArray()); + } + + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + try (InputStream trustStoreFile = Files.newInputStream(new File(trustStoreFilePath).toPath())) { + trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); } + + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(keyStore, certPassword.toCharArray()); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(trustStore); + + SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion); + sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + + return sslContext; } /** - * Creates the SSL Context for the client if SSL is configured. - * - * @param sslConfig - * The application configuration - * @return The SSLContext object which can be used by the ssl transport client - * Returns null if SSL is disabled - * @throws Exception - * Thrown if there is any misconfiguration + * Creates an SSL context for the external REST endpoint server. */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { + public static SSLContext createRestServerSSLContext(Configuration config) throws Exception { + checkNotNull(config, "config"); - Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; + if (!isRestSSLEnabled(config)) { + return null; + } - if (getSSLEnabled(sslConfig)) { - LOG.debug("Creating client SSL context from configuration"); + String keystoreFilePath = getAndCheckOption( + config, SecurityOptions.SSL_REST_KEYSTORE, SecurityOptions.SSL_KEYSTORE); - String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); - String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); - String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + String keystorePassword = getAndCheckOption( + config, SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, SecurityOptions.SSL_KEYSTORE_PASSWORD); - Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); - Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); + String certPassword = getAndCheckOption( + config, SecurityOptions.SSL_REST_KEY_PASSWORD, SecurityOptions.SSL_KEY_PASSWORD); - KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); - trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } - } + KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + try (InputStream keyStoreFile = Files.newInputStream(new File(keystoreFilePath).toPath())) { + keyStore.load(keyStoreFile, keystorePassword.toCharArray()); + } - TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( - TrustManagerFactory.getDefaultAlgorithm()); - trustManagerFactory.init(trustStore); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(keyStore, certPassword.toCharArray()); - clientSSLContext = SSLContext.getInstance(sslProtocolVersion); - clientSSLContext.init(null, trustManagerFactory.getTrustManagers(), null); - } + SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion); + sslContext.init(kmf.getKeyManagers(), null, null); - return clientSSLContext; + return sslContext; } /** - * Creates the SSL Context for the server if SSL is configured. - * - * @param sslConfig - * The application configuration - * @return The SSLContext object which can be used by the ssl transport server - * Returns null if SSL is disabled - * @throws Exception - * Thrown if there is any misconfiguration + * Creates an SSL context for clients against the external REST endpoint. */ @Nullable - public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception { + public static SSLContext createRestClientSSLContext(Configuration config) throws Exception { + checkNotNull(config, "config"); - Preconditions.checkNotNull(sslConfig); - SSLContext serverSSLContext = null; + if (!isRestSSLEnabled(config)) { + return null; + } - if (getSSLEnabled(sslConfig)) { - LOG.debug("Creating server SSL context from configuration"); + String trustStoreFilePath = getAndCheckOption( + config, SecurityOptions.SSL_REST_TRUSTSTORE, SecurityOptions.SSL_TRUSTSTORE); - String keystoreFilePath = sslConfig.getString(SecurityOptions.SSL_KEYSTORE); + String trustStorePassword = getAndCheckOption( + config, SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, SecurityOptions.SSL_TRUSTSTORE_PASSWORD); - String keystorePassword = sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD); + String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL); - String certPassword = sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD); + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + try (InputStream trustStoreFile = Files.newInputStream(new File(trustStoreFilePath).toPath())) { + trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); + } - String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(trustStore); - Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was not configured."); - Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured."); - Preconditions.checkNotNull(certPassword, SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured."); + SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion); + sslContext.init(null, tmf.getTrustManagers(), null); - KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); - try (FileInputStream keyStoreFile = new FileInputStream(new File(keystoreFilePath))) { - ks.load(keyStoreFile, keystorePassword.toCharArray()); - } + return sslContext; + } - // Set up key manager factory to use the server key store - KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); - kmf.init(ks, certPassword.toCharArray()); + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ - // Initialize the SSLContext - serverSSLContext = SSLContext.getInstance(sslProtocolVersion); - serverSSLContext.init(kmf.getKeyManagers(), null, null); + private static String getAndCheckOption(Configuration config, ConfigOption primaryOption, ConfigOption fallbackOption) { + String value = config.getString(primaryOption, config.getString(fallbackOption)); + if (value != null) { + return value; + } + else { + throw new IllegalConfigurationException("The config option " + primaryOption.key() + + " or " + fallbackOption.key() + " is missing."); } - - return serverSSLContext; } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java index b1591be2504ab..cbd888dd2ce3b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java @@ -20,7 +20,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.net.SSLEngineFactory; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.util.ConfigurationException; @@ -91,10 +90,9 @@ public static RestClientConfiguration fromConfiguration(Configuration config) th Preconditions.checkNotNull(config); final SSLEngineFactory sslEngineFactory; - final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED); - if (enableSSL) { + if (SSLUtils.isRestSSLEnabled(config)) { try { - sslEngineFactory = SSLUtils.createClientSSLEngineFactory(config); + sslEngineFactory = SSLUtils.createRestClientSSLEngineFactory(config); } catch (Exception e) { throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java index 875230f1552b4..561891f927674 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java @@ -20,7 +20,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.net.SSLEngineFactory; import org.apache.flink.runtime.net.SSLUtils; @@ -157,10 +156,9 @@ public static RestServerEndpointConfiguration fromConfiguration(Configuration co final int port = config.getInteger(RestOptions.PORT); final SSLEngineFactory sslEngineFactory; - final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED) && config.getBoolean(WebOptions.SSL_ENABLED); - if (enableSSL) { + if (SSLUtils.isRestSSLEnabled(config)) { try { - sslEngineFactory = SSLUtils.createServerSSLEngineFactory(config); + sslEngineFactory = SSLUtils.createRestServerSSLEngineFactory(config); } catch (Exception e) { throw new ConfigurationException("Failed to initialize SSLEngineFactory for REST server endpoint.", e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index 6ae142b309c9f..982a53668c5ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -130,7 +130,7 @@ public static String getRpcUrl( checkNotNull(config, "config is null"); final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) && - SSLUtils.getSSLEnabled(config); + SSLUtils.isInternalSSLEnabled(config); return getRpcUrl( hostname, diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 57ca9d4160aec..12378e0f196ed 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -336,21 +336,31 @@ object AkkaUtils { val logLifecycleEvents = if (lifecycleEvents) "on" else "off" val akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) && - SSLUtils.getSSLEnabled(configuration) + SSLUtils.isInternalSSLEnabled(configuration) val retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR) val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off" - val akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_KEYSTORE) + val akkaSSLKeyStore = configuration.getString( + SecurityOptions.SSL_INTERNAL_KEYSTORE, + configuration.getString(SecurityOptions.SSL_KEYSTORE)) - val akkaSSLKeyStorePassword = configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD) + val akkaSSLKeyStorePassword = configuration.getString( + SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, + configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD)) - val akkaSSLKeyPassword = configuration.getString(SecurityOptions.SSL_KEY_PASSWORD) + val akkaSSLKeyPassword = configuration.getString( + SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, + configuration.getString(SecurityOptions.SSL_KEY_PASSWORD)) - val akkaSSLTrustStore = configuration.getString(SecurityOptions.SSL_TRUSTSTORE) + val akkaSSLTrustStore = configuration.getString( + SecurityOptions.SSL_INTERNAL_TRUSTSTORE, + configuration.getString(SecurityOptions.SSL_TRUSTSTORE)) - val akkaSSLTrustStorePassword = configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD) + val akkaSSLTrustStorePassword = configuration.getString( + SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, + configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD)) val akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index b654ceee6ae92..531f2148f2373 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -20,7 +20,7 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.net.SSLUtilsTest; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -55,40 +55,25 @@ public class BlobClientSslTest extends BlobClientTest { */ @BeforeClass public static void startSSLServer() throws IOException { - Configuration config = new Configuration(); - config.setString(BlobServerOptions.STORAGE_DIRECTORY, - temporarySslFolder.newFolder().getAbsolutePath()); - config.setBoolean(SecurityOptions.SSL_ENABLED, true); - config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); + Configuration config = SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath()); + blobSslServer = new BlobServer(config, new VoidBlobStore()); blobSslServer.start(); - sslClientConfig = new Configuration(); - sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); + sslClientConfig = config; } @BeforeClass public static void startNonSSLServer() throws IOException { - Configuration config = new Configuration(); - config.setString(BlobServerOptions.STORAGE_DIRECTORY, - temporarySslFolder.newFolder().getAbsolutePath()); - config.setBoolean(SecurityOptions.SSL_ENABLED, true); + Configuration config = SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath()); config.setBoolean(BlobServerOptions.SSL_ENABLED, false); - config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); + blobNonSslServer = new BlobServer(config, new VoidBlobStore()); blobNonSslServer.start(); - nonSslClientConfig = new Configuration(); - nonSslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - nonSslClientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false); - nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); + nonSslClientConfig = config; } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index 33e004ebdf670..27506601e7fd6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -151,15 +151,14 @@ public ChannelHandler[] getClientChannelHandlers() { NettyTestUtil.shutdown(serverAndClient); } - private Configuration createSslConfig() throws Exception { - + private static Configuration createSslConfig() throws Exception { Configuration flinkConfig = new Configuration(); - flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - flinkConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); + flinkConfig.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true); + flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/local127.keystore"); + flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "password"); + flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, "password"); + flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, "src/test/resources/local127.truststore"); + flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, "password"); return flinkConfig; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index cdc121ad76b5d..e5a03eb5babff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -22,20 +22,17 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.SecurityOptions; -import org.hamcrest.collection.IsArrayContainingInAnyOrder; -import org.junit.Assert; import org.junit.Test; -import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLServerSocket; import java.net.ServerSocket; -import java.util.Arrays; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -45,148 +42,294 @@ */ public class SSLUtilsTest { + public static final String TRUST_STORE_PATH = SSLUtilsTest.class.getResource("/local127.truststore").getFile(); + public static final String KEY_STORE_PATH = SSLUtilsTest.class.getResource("/local127.keystore").getFile(); + public static final String UNTRUSTED_KEY_STORE_PATH = SSLUtilsTest.class.getResource("/local127.keystore").getFile(); + + public static final String TRUST_STORE_PASSWORD = "password"; + public static final String KEY_STORE_PASSWORD = "password"; + public static final String KEY_PASSWORD = "password"; + /** - * Tests if SSL Client Context is created given a valid SSL configuration. + * Tests whether activation of internal / REST SSL evaluates the config flags correctly. */ + @SuppressWarnings("deprecation") @Test - public void testCreateSSLClientContext() throws Exception { + public void checkEnableSSL() { + // backwards compatibility + Configuration oldConf = new Configuration(); + oldConf.setBoolean(SecurityOptions.SSL_ENABLED, true); + assertTrue(SSLUtils.isInternalSSLEnabled(oldConf)); + assertTrue(SSLUtils.isRestSSLEnabled(oldConf)); + + // new options take precedence + Configuration newOptions = new Configuration(); + newOptions.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true); + newOptions.setBoolean(SecurityOptions.SSL_REST_ENABLED, false); + assertTrue(SSLUtils.isInternalSSLEnabled(newOptions)); + assertFalse(SSLUtils.isRestSSLEnabled(newOptions)); + + // new options take precedence + Configuration precedence = new Configuration(); + precedence.setBoolean(SecurityOptions.SSL_ENABLED, true); + precedence.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, false); + precedence.setBoolean(SecurityOptions.SSL_REST_ENABLED, false); + assertFalse(SSLUtils.isInternalSSLEnabled(precedence)); + assertFalse(SSLUtils.isRestSSLEnabled(precedence)); + } - Configuration clientConfig = new Configuration(); - clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); + @Test + public void testSocketFactoriesWhenSslDisabled() throws Exception { + Configuration config = new Configuration(); - SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig); - Assert.assertNotNull(clientContext); + try { + SSLUtils.createSSLServerSocketFactory(config); + fail("exception expected"); + } catch (IllegalConfigurationException ignored) {} + + try { + SSLUtils.createSSLClientSocketFactory(config); + fail("exception expected"); + } catch (IllegalConfigurationException ignored) {} } + // ------------------------ REST client -------------------------- + /** - * Tests if SSL Client Context is not created if SSL is not configured. + * Tests if REST Client SSL is created given a valid SSL configuration. */ @Test - public void testCreateSSLClientContextWithSSLDisabled() throws Exception { + public void testRESTClientSSL() throws Exception { + Configuration clientConfig = createRestSslConfigWithTrustStore(); - Configuration clientConfig = new Configuration(); - clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, false); - - SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig); - Assert.assertNull(clientContext); + SSLEngineFactory ssl = SSLUtils.createRestClientSSLEngineFactory(clientConfig); + assertNotNull(ssl); } /** - * Tests if SSL Client Context creation fails with bad SSL configuration. + * Tests that REST Client SSL Client is not created if SSL is not configured. */ @Test - public void testCreateSSLClientContextMisconfiguration() { - - Configuration clientConfig = new Configuration(); - clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "badpassword"); + public void testRESTClientSSLDisabled() throws Exception { + Configuration clientConfig = createRestSslConfigWithTrustStore(); + clientConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, false); try { - SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig); - Assert.fail("SSL client context created even with bad SSL configuration "); - } catch (Exception e) { - // Exception here is valid - } + SSLUtils.createRestClientSSLEngineFactory(clientConfig); + fail("exception expected"); + } catch (IllegalConfigurationException ignored) {} } /** - * Tests if SSL Server Context is created given a valid SSL configuration. + * Tests that REST Client SSL creation fails with bad SSL configuration. */ @Test - public void testCreateSSLServerContext() throws Exception { + public void testRESTClientSSLMissingTrustStore() throws Exception { + Configuration config = new Configuration(); + config.setBoolean(SecurityOptions.SSL_REST_ENABLED, true); + config.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, "some password"); - Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); + try { + SSLUtils.createRestClientSSLEngineFactory(config); + fail("exception expected"); + } catch (IllegalConfigurationException ignored) {} + } - SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); - Assert.assertNotNull(serverContext); + /** + * Tests that REST Client SSL creation fails with bad SSL configuration. + */ + @Test + public void testRESTClientSSLMissingPassword() throws Exception { + Configuration config = new Configuration(); + config.setBoolean(SecurityOptions.SSL_REST_ENABLED, true); + config.setString(SecurityOptions.SSL_REST_TRUSTSTORE, TRUST_STORE_PATH); + + try { + SSLUtils.createRestClientSSLEngineFactory(config); + fail("exception expected"); + } catch (IllegalConfigurationException ignored) {} } /** - * Tests if SSL Server Context is not created if SSL is disabled. + * Tests that REST Client SSL creation fails with bad SSL configuration. */ @Test - public void testCreateSSLServerContextWithSSLDisabled() throws Exception { + public void testRESTClientSSLWrongPassword() throws Exception { + Configuration clientConfig = createRestSslConfigWithTrustStore(); + clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, "badpassword"); - Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, false); + try { + SSLUtils.createRestClientSSLEngineFactory(clientConfig); + fail("exception expected"); + } catch (Exception ignored) {} + } + + // ------------------------ server -------------------------- - SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); - Assert.assertNull(serverContext); + /** + * Tests that REST Server SSL Engine is created given a valid SSL configuration. + */ + @Test + public void testRESTServerSSL() throws Exception { + Configuration serverConfig = createRestSslConfigWithKeyStore(); + + SSLEngineFactory ssl = SSLUtils.createRestServerSSLEngineFactory(serverConfig); + assertNotNull(ssl); } /** - * Tests if SSL Server Context creation fails with bad SSL configuration. + * Tests that REST Server SSL Engine is not created if SSL is disabled. */ @Test - public void testCreateSSLServerContextMisconfiguration() { + public void testRESTServerSSLDisabled() throws Exception { + Configuration serverConfig = createRestSslConfigWithKeyStore(); + serverConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, false); - Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "badpassword"); - serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "badpassword"); + try { + SSLUtils.createRestServerSSLEngineFactory(serverConfig); + fail("exception expected"); + } catch (IllegalConfigurationException ignored) {} + } + + /** + * Tests that REST Server SSL Engine creation fails with bad SSL configuration. + */ + @Test + public void testRESTServerSSLBadKeystorePassword() { + Configuration serverConfig = createRestSslConfigWithKeyStore(); + serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "badpassword"); try { - SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); - Assert.fail("SSL server context created even with bad SSL configuration "); - } catch (Exception e) { - // Exception here is valid - } + SSLUtils.createRestServerSSLEngineFactory(serverConfig); + fail("exception expected"); + } catch (Exception ignored) {} } /** - * Tests if SSL Server Context creation fails with bad SSL configuration. + * Tests that REST Server SSL Engine creation fails with bad SSL configuration. */ @Test - public void testCreateSSLServerContextWithMultiProtocols() { + public void testRESTServerSSLBadKeyPassword() { + Configuration serverConfig = createRestSslConfigWithKeyStore(); + serverConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "badpassword"); - Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1,TLSv1.2"); + try { + SSLUtils.createRestServerSSLEngineFactory(serverConfig); + fail("exception expected"); + } catch (Exception ignored) {} + } + + // ----------------------- mutual auth contexts -------------------------- + + @Test + public void testInternalSSL() throws Exception { + final Configuration config = createInternalSslConfigWithKeyAndTrustStores(); + assertNotNull(SSLUtils.createInternalServerSSLEngineFactory(config)); + assertNotNull(SSLUtils.createInternalClientSSLEngineFactory(config)); + } + + @Test + public void testInternalSSLDisables() throws Exception { + final Configuration config = createInternalSslConfigWithKeyAndTrustStores(); + config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, false); try { - SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); - Assert.fail("SSL server context created even with multiple protocols set "); - } catch (Exception e) { - // Exception here is valid - } + SSLUtils.createInternalServerSSLEngineFactory(config); + fail("exception expected"); + } catch (Exception ignored) {} + + try { + SSLUtils.createInternalClientSSLEngineFactory(config); + fail("exception expected"); + } catch (Exception ignored) {} } @Test - public void testSocketFactoriesWhenSslDisables() throws Exception { - Configuration config = new Configuration(); + public void testInternalSSLKeyStoreOnly() throws Exception { + final Configuration config = createInternalSslConfigWithKeyStore(); try { - SSLUtils.createSSLServerSocketFactory(config); + SSLUtils.createInternalServerSSLEngineFactory(config); fail("exception expected"); - } catch (IllegalConfigurationException ignored) {} + } catch (Exception ignored) {} try { - SSLUtils.createSSLClientSocketFactory(config); + SSLUtils.createInternalClientSSLEngineFactory(config); fail("exception expected"); - } catch (IllegalConfigurationException ignored) {} + } catch (Exception ignored) {} + } + + @Test + public void testInternalSSLTrustStoreOnly() throws Exception { + final Configuration config = createInternalSslConfigWithTrustStore(); + + try { + SSLUtils.createInternalServerSSLEngineFactory(config); + fail("exception expected"); + } catch (Exception ignored) {} + + try { + SSLUtils.createInternalClientSSLEngineFactory(config); + fail("exception expected"); + } catch (Exception ignored) {} + } + + @Test + public void testInternalSSLWrongKeystorePassword() throws Exception { + final Configuration config = createInternalSslConfigWithKeyAndTrustStores(); + config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "badpw"); + + try { + SSLUtils.createInternalServerSSLEngineFactory(config); + fail("exception expected"); + } catch (Exception ignored) {} + + try { + SSLUtils.createInternalClientSSLEngineFactory(config); + fail("exception expected"); + } catch (Exception ignored) {} } + @Test + public void testInternalSSLWrongTruststorePassword() throws Exception { + final Configuration config = createInternalSslConfigWithKeyAndTrustStores(); + config.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, "badpw"); + + try { + SSLUtils.createInternalServerSSLEngineFactory(config); + fail("exception expected"); + } catch (Exception ignored) {} + + try { + SSLUtils.createInternalClientSSLEngineFactory(config); + fail("exception expected"); + } catch (Exception ignored) {} + } + + @Test + public void testInternalSSLWrongKeyPassword() throws Exception { + final Configuration config = createInternalSslConfigWithKeyAndTrustStores(); + config.setString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, "badpw"); + + try { + SSLUtils.createInternalServerSSLEngineFactory(config); + fail("exception expected"); + } catch (Exception ignored) {} + + try { + SSLUtils.createInternalClientSSLEngineFactory(config); + fail("exception expected"); + } catch (Exception ignored) {} + } + + // -------------------- protocols and cipher suites ----------------------- + /** * Tests if SSLUtils set the right ssl version and cipher suites for SSLServerSocket. */ @Test public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exception { - Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); + Configuration serverConfig = createInternalSslConfigWithKeyAndTrustStores(); // set custom protocol and cipher suites serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1.1"); @@ -202,66 +345,98 @@ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exceptio assertEquals(1, protocols.length); assertEquals("TLSv1.1", protocols[0]); assertEquals(2, algorithms.length); - assertThat(algorithms, IsArrayContainingInAnyOrder.arrayContainingInAnyOrder( + assertThat(algorithms, arrayContainingInAnyOrder( "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA256")); } } /** - * Tests if SSLUtils set the right ssl version and cipher suites for SSLEngine. + * Tests that {@link SSLEngineFactory} is created correctly. */ @Test - public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception { + public void testCreateSSLEngineFactory() throws Exception { + Configuration serverConfig = createInternalSslConfigWithKeyAndTrustStores(); - Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); + // set custom protocol and cipher suites serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1"); serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"); - SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); - SSLEngine engine = serverContext.createSSLEngine(); + final SSLEngineFactory serverSSLEngineFactory = SSLUtils.createInternalServerSSLEngineFactory(serverConfig); + final SSLEngine sslEngine = serverSSLEngineFactory.createSSLEngine(); - String[] protocols = engine.getEnabledProtocols(); - String[] algorithms = engine.getEnabledCipherSuites(); + assertEquals(1, sslEngine.getEnabledProtocols().length); + assertEquals("TLSv1", sslEngine.getEnabledProtocols()[0]); - Assert.assertNotEquals(1, protocols.length); - Assert.assertNotEquals(2, algorithms.length); + assertEquals(2, sslEngine.getEnabledCipherSuites().length); + assertThat(sslEngine.getEnabledCipherSuites(), arrayContainingInAnyOrder( + "TLS_DHE_RSA_WITH_AES_128_CBC_SHA", "TLS_DHE_RSA_WITH_AES_128_CBC_SHA256")); + } - SSLUtils.setSSLVerAndCipherSuites(engine, serverConfig); - protocols = engine.getEnabledProtocols(); - algorithms = engine.getEnabledCipherSuites(); + // ------------------------------- utils ---------------------------------- - Assert.assertEquals(1, protocols.length); - Assert.assertEquals("TLSv1", protocols[0]); - Assert.assertEquals(2, algorithms.length); - Assert.assertTrue(algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256")); - Assert.assertTrue(algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256")); + public static Configuration createRestSslConfigWithKeyStore() { + final Configuration config = new Configuration(); + config.setBoolean(SecurityOptions.SSL_REST_ENABLED, true); + addRestKeyStoreConfig(config); + return config; } - /** - * Tests that {@link SSLEngineFactory} is created correctly. - */ - @Test - public void testCreateSSLEngineFactory() throws Exception { - Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1"); - serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"); + public static Configuration createRestSslConfigWithTrustStore() { + final Configuration config = new Configuration(); + config.setBoolean(SecurityOptions.SSL_REST_ENABLED, true); + addRestTrustStoreConfig(config); + return config; + } - final SSLEngineFactory serverSSLEngineFactory = SSLUtils.createServerSSLEngineFactory(serverConfig); - final SSLEngine sslEngine = serverSSLEngineFactory.createSSLEngine(); + public static Configuration createRestSslConfigWithKeyAndTrustStores() { + final Configuration config = new Configuration(); + config.setBoolean(SecurityOptions.SSL_REST_ENABLED, true); + addRestKeyStoreConfig(config); + addRestTrustStoreConfig(config); + return config; + } + + public static Configuration createInternalSslConfigWithKeyStore() { + final Configuration config = new Configuration(); + config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true); + addInternalKeyStoreConfig(config); + return config; + } + + public static Configuration createInternalSslConfigWithTrustStore() { + final Configuration config = new Configuration(); + config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true); + addInternalTrustStoreConfig(config); + return config; + } + + public static Configuration createInternalSslConfigWithKeyAndTrustStores() { + final Configuration config = new Configuration(); + config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true); + addInternalKeyStoreConfig(config); + addInternalTrustStoreConfig(config); + return config; + } + + private static void addRestKeyStoreConfig(Configuration config) { + config.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_PATH); + config.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, KEY_STORE_PASSWORD); + config.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, KEY_PASSWORD); + } + + private static void addRestTrustStoreConfig(Configuration config) { + config.setString(SecurityOptions.SSL_REST_TRUSTSTORE, TRUST_STORE_PATH); + config.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, TRUST_STORE_PASSWORD); + } + + private static void addInternalKeyStoreConfig(Configuration config) { + config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, KEY_STORE_PATH); + config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, KEY_STORE_PASSWORD); + config.setString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, KEY_PASSWORD); + } - assertThat( - Arrays.asList(sslEngine.getEnabledProtocols()), - contains("TLSv1")); - assertThat( - Arrays.asList(sslEngine.getEnabledCipherSuites()), - containsInAnyOrder("TLS_DHE_RSA_WITH_AES_128_CBC_SHA", "TLS_DHE_RSA_WITH_AES_128_CBC_SHA256")); + private static void addInternalTrustStoreConfig(Configuration config) { + config.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, TRUST_STORE_PATH); + config.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, TRUST_STORE_PASSWORD); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java index 93dbb5dc145d1..9251ffe7609e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java @@ -138,12 +138,12 @@ public static Collection data() { .getFile()).getAbsolutePath(); final Configuration sslConfig = new Configuration(config); - sslConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - sslConfig.setString(SecurityOptions.SSL_TRUSTSTORE, truststorePath); - sslConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); - sslConfig.setString(SecurityOptions.SSL_KEYSTORE, keystorePath); - sslConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - sslConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); + sslConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, true); + sslConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, truststorePath); + sslConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, "password"); + sslConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, keystorePath); + sslConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password"); + sslConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password"); return Arrays.asList(new Object[][]{ {config}, {sslConfig} @@ -164,7 +164,7 @@ public void setup() throws Exception { config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath()); defaultSSLContext = SSLContext.getDefault(); - final SSLContext sslClientContext = SSLUtils.createSSLClientContext(config); + final SSLContext sslClientContext = SSLUtils.createRestClientSSLContext(config); if (sslClientContext != null) { SSLContext.setDefault(sslClientContext); } @@ -209,7 +209,9 @@ public void setup() throws Exception { @After public void teardown() throws Exception { - SSLContext.setDefault(defaultSSLContext); + if (defaultSSLContext != null) { + SSLContext.setDefault(defaultSSLContext); + } if (restClient != null) { restClient.shutdown(timeout); diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index ebdae313e3b1b..ab62d3eeef715 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -54,7 +54,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) - config.setBoolean(SecurityOptions.SSL_ENABLED, true) + config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true) config.setString(SecurityOptions.SSL_KEYSTORE, getClass.getResource("/local127.keystore").getPath) config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") @@ -81,7 +81,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) - config.setBoolean(SecurityOptions.SSL_ENABLED, true) + config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true) config.setString(SecurityOptions.SSL_KEYSTORE, getClass.getResource("/local127.keystore").getPath) config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") @@ -103,7 +103,7 @@ class AkkaSslITCase(_system: ActorSystem) val config = new Configuration() config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) - config.setBoolean(SecurityOptions.SSL_ENABLED, false) + config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, false) val cluster = new TestingCluster(config, false) @@ -121,7 +121,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") - config.setBoolean(SecurityOptions.SSL_ENABLED, true) + config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true) config.setString(SecurityOptions.SSL_KEYSTORE, "invalid.keystore") config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password") @@ -143,7 +143,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") - config.setBoolean(SecurityOptions.SSL_ENABLED, true) + config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true) val cluster = new TestingCluster(config, false)