Skip to content

Commit

Permalink
[FLINK-9314] [security] (part 4) Add mutual authentication for intern…
Browse files Browse the repository at this point in the history
…al Netty and Blob Server connections

This closes apache#6326.
  • Loading branch information
StephanEwen authored and tillrohrmann committed Jul 16, 2018
1 parent 3aeb00f commit a502f82
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ public boolean isCreditBasedEnabled() {
return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
}

public Configuration getConfig() {
return config;
}

@Override
public String toString() {
String format = "NettyConfig [" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,20 @@ public class SSLEngineFactory {

private final boolean clientMode;

final boolean clientAuthentication;

public SSLEngineFactory(
final SSLContext sslContext,
final String[] enabledProtocols,
final String[] enabledCipherSuites,
final boolean clientMode) {
final boolean clientMode,
final boolean clientAuthentication) {

this.sslContext = requireNonNull(sslContext, "sslContext must not be null");
this.enabledProtocols = requireNonNull(enabledProtocols, "enabledProtocols must not be null");
this.enabledCipherSuites = requireNonNull(enabledCipherSuites, "cipherSuites must not be null");
this.clientMode = clientMode;
this.clientAuthentication = clientAuthentication;
}

public SSLEngine createSSLEngine() {
Expand All @@ -63,5 +68,8 @@ private void configureSSLEngine(SSLEngine sslEngine) {
sslEngine.setEnabledProtocols(enabledProtocols);
sslEngine.setEnabledCipherSuites(enabledCipherSuites);
sslEngine.setUseClientMode(clientMode);
if (!clientMode) {
sslEngine.setNeedClientAuth(clientAuthentication);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public static SSLEngineFactory createInternalServerSSLEngineFactory(final Config
sslContext,
getEnabledProtocols(config),
getEnabledCipherSuites(config),
false);
false,
true);
}

/**
Expand All @@ -124,6 +125,7 @@ public static SSLEngineFactory createInternalClientSSLEngineFactory(final Config
sslContext,
getEnabledProtocols(config),
getEnabledCipherSuites(config),
true,
true);
}

Expand All @@ -142,6 +144,7 @@ public static SSLEngineFactory createRestServerSSLEngineFactory(final Configurat
sslContext,
getEnabledProtocols(config),
getEnabledCipherSuites(config),
false,
false);
}

Expand All @@ -160,7 +163,8 @@ public static SSLEngineFactory createRestClientSSLEngineFactory(final Configurat
sslContext,
getEnabledProtocols(config),
getEnabledCipherSuites(config),
true);
true,
false);
}

private static String[] getEnabledProtocols(final Configuration config) {
Expand Down Expand Up @@ -352,6 +356,7 @@ public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddr
private void configureServerSocket(SSLServerSocket socket) {
socket.setEnabledProtocols(protocols);
socket.setEnabledCipherSuites(cipherSuites);
socket.setNeedClientAuth(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.util.NetUtils;

import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
Expand All @@ -35,31 +37,20 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Tests for the SSL connection between Netty Server and Client used for the
* data plane.
*/
public class NettyClientServerSslTest {

/**
* Verify valid ssl configuration and connection.
*/
@Test
public void testValidSslConnection() throws Exception {
NettyProtocol protocol = new NettyProtocol(null, null, true) {
@Override
public ChannelHandler[] getServerChannelHandlers() {
return new ChannelHandler[0];
}

@Override
public ChannelHandler[] getClientChannelHandlers() {
return new ChannelHandler[0];
}
};

NettyConfig nettyConfig = new NettyConfig(
InetAddress.getLoopbackAddress(),
NetUtils.getAvailablePort(),
NettyTestUtil.DEFAULT_SEGMENT_SIZE,
1,
createSslConfig());
NettyProtocol protocol = new NoOpProtocol();

NettyConfig nettyConfig = createNettyConfig(createSslConfig());

NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);

Expand All @@ -77,28 +68,13 @@ public ChannelHandler[] getClientChannelHandlers() {
*/
@Test
public void testInvalidSslConfiguration() throws Exception {
NettyProtocol protocol = new NettyProtocol(null, null, true) {
@Override
public ChannelHandler[] getServerChannelHandlers() {
return new ChannelHandler[0];
}

@Override
public ChannelHandler[] getClientChannelHandlers() {
return new ChannelHandler[0];
}
};
NettyProtocol protocol = new NoOpProtocol();

Configuration config = createSslConfig();
// Modify the keystore password to an incorrect one
config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "invalidpassword");
config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "invalidpassword");

NettyConfig nettyConfig = new NettyConfig(
InetAddress.getLoopbackAddress(),
NetUtils.getAvailablePort(),
NettyTestUtil.DEFAULT_SEGMENT_SIZE,
1,
config);
NettyConfig nettyConfig = createNettyConfig(config);

NettyTestUtil.NettyServerAndClient serverAndClient = null;
try {
Expand All @@ -116,29 +92,14 @@ public ChannelHandler[] getClientChannelHandlers() {
*/
@Test
public void testSslHandshakeError() throws Exception {
NettyProtocol protocol = new NettyProtocol(null, null, true) {
@Override
public ChannelHandler[] getServerChannelHandlers() {
return new ChannelHandler[0];
}

@Override
public ChannelHandler[] getClientChannelHandlers() {
return new ChannelHandler[0];
}
};
NettyProtocol protocol = new NoOpProtocol();

Configuration config = createSslConfig();

// Use a server certificate which is not present in the truststore
config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/untrusted.keystore");
config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");

NettyConfig nettyConfig = new NettyConfig(
InetAddress.getLoopbackAddress(),
NetUtils.getAvailablePort(),
NettyTestUtil.DEFAULT_SEGMENT_SIZE,
1,
config);
NettyConfig nettyConfig = createNettyConfig(config);

NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);

Expand All @@ -151,14 +112,60 @@ public ChannelHandler[] getClientChannelHandlers() {
NettyTestUtil.shutdown(serverAndClient);
}

private static Configuration createSslConfig() throws Exception {
Configuration flinkConfig = new Configuration();
flinkConfig.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/local127.keystore");
flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "password");
flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, "password");
flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, "src/test/resources/local127.truststore");
flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, "password");
return flinkConfig;
@Test
public void testClientUntrustedCertificate() throws Exception {
final Configuration serverConfig = createSslConfig();
final Configuration clientConfig = createSslConfig();

// give the client a different keystore / certificate
clientConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");

final NettyConfig nettyServerConfig = createNettyConfig(serverConfig);
final NettyConfig nettyClientConfig = createNettyConfig(clientConfig);

final NettyBufferPool bufferPool = new NettyBufferPool(1);
final NettyProtocol protocol = new NoOpProtocol();

final NettyServer server = NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool);
final NettyClient client = NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
final NettyServerAndClient serverAndClient = new NettyServerAndClient(server, client);

final Channel ch = NettyTestUtil.connect(serverAndClient);
ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());

// Attempting to write data over ssl should fail
assertFalse(ch.writeAndFlush("test").await().isSuccess());

NettyTestUtil.shutdown(serverAndClient);
}

private static Configuration createSslConfig() {
return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
}

private static NettyConfig createNettyConfig(Configuration config) {
return new NettyConfig(
InetAddress.getLoopbackAddress(),
NetUtils.getAvailablePort(),
NettyTestUtil.DEFAULT_SEGMENT_SIZE,
1,
config);
}

private static final class NoOpProtocol extends NettyProtocol {

NoOpProtocol() {
super(null, null, true);
}

@Override
public ChannelHandler[] getServerChannelHandlers() {
return new ChannelHandler[0];
}

@Override
public ChannelHandler[] getClientChannelHandlers() {
return new ChannelHandler[0];
}
}
}

0 comments on commit a502f82

Please sign in to comment.