Skip to content

Commit

Permalink
[FLINK-12547][blob] Add connection and socket timeouts for the blob c…
Browse files Browse the repository at this point in the history
…lient

This closes apache#8484.
  • Loading branch information
sunhaibotb authored and tillrohrmann committed Jun 3, 2019
1 parent f8b78f1 commit de31f49
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 10 deletions.
10 changes: 10 additions & 0 deletions docs/_includes/generated/blob_server_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>blob.client.socket.timeout</h5></td>
<td style="word-wrap: break-word;">300000</td>
<td>The socket timeout in milliseconds for the blob client.</td>
</tr>
<tr>
<td><h5>blob.client.connect.timeout</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>The connection timeout in milliseconds for the blob client.</td>
</tr>
<tr>
<td><h5>blob.fetch.backlog</h5></td>
<td style="word-wrap: break-word;">1000</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,20 @@ public class BlobServerOptions {
public static final ConfigOption<Integer> OFFLOAD_MINSIZE = key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded to the BlobServer.");

/**
* The socket timeout in milliseconds for the blob client.
*/
public static final ConfigOption<Integer> SO_TIMEOUT =
key("blob.client.socket.timeout")
.defaultValue(300_000)
.withDescription("The socket timeout in milliseconds for the blob client.");

/**
* The connection timeout in milliseconds for the blob client.
*/
public static final ConfigOption<Integer> CONNECT_TIMEOUT =
key("blob.client.connect.timeout")
.defaultValue(0)
.withDescription("The connection timeout in milliseconds for the blob client.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) t
if (SSLUtils.isInternalSSLEnabled(clientConfig) && clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
LOG.info("Using ssl connection to the blob server");

socket = SSLUtils.createSSLClientSocketFactory(clientConfig).createSocket(
serverAddress.getAddress(),
serverAddress.getPort());
socket = SSLUtils.createSSLClientSocketFactory(clientConfig).createSocket();
}
else {
socket = new Socket();
socket.connect(serverAddress);
}

socket.connect(serverAddress, clientConfig.getInteger(BlobServerOptions.CONNECT_TIMEOUT));
socket.setSoTimeout(clientConfig.getInteger(BlobServerOptions.SO_TIMEOUT));
}
catch (Exception e) {
BlobUtils.closeSilently(socket, LOG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public class BlobClientSslTest extends BlobClientTest {

/** The instance of the SSL BLOB server used during the tests. */
private static BlobServer blobSslServer;
private static TestBlobServer blobSslServer;

/** Instance of a non-SSL BLOB server with SSL-enabled security options. */
private static BlobServer blobNonSslServer;
Expand All @@ -58,7 +58,7 @@ public static void startSSLServer() throws IOException {
Configuration config = SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath());

blobSslServer = new BlobServer(config, new VoidBlobStore());
blobSslServer = new TestBlobServer(config, new VoidBlobStore());
blobSslServer.start();

sslClientConfig = config;
Expand Down Expand Up @@ -93,7 +93,7 @@ protected Configuration getBlobClientConfig() {
return sslClientConfig;
}

protected BlobServer getBlobServer() {
protected TestBlobServer getBlobServer() {
return blobSslServer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

import org.junit.AfterClass;
Expand All @@ -48,6 +49,8 @@
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand All @@ -62,7 +65,7 @@ public class BlobClientTest extends TestLogger {
private static final int TEST_BUFFER_SIZE = 17 * 1000;

/** The instance of the (non-ssl) BLOB server used during the tests. */
static BlobServer blobServer;
static TestBlobServer blobServer;

/** The blob service (non-ssl) client configuration. */
static Configuration clientConfig;
Expand All @@ -79,7 +82,7 @@ public static void startServer() throws IOException {
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());

blobServer = new BlobServer(config, new VoidBlobStore());
blobServer = new TestBlobServer(config, new VoidBlobStore());
blobServer.start();

clientConfig = new Configuration();
Expand Down Expand Up @@ -318,7 +321,7 @@ protected Configuration getBlobClientConfig() {
return clientConfig;
}

protected BlobServer getBlobServer() {
protected TestBlobServer getBlobServer() {
return blobServer;
}

Expand Down Expand Up @@ -487,4 +490,58 @@ private static void uploadJarFile(
validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0)), testFile);
}
}


/**
* Tests the socket operation timeout.
*/
@Test
public void testSocketTimeout() {
Configuration clientConfig = getBlobClientConfig();
int oldSoTimeout = clientConfig.getInteger(BlobServerOptions.SO_TIMEOUT);

clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 50);
getBlobServer().setBlockingMillis(10_000);

try {
InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());

try (BlobClient client = new BlobClient(serverAddress, clientConfig)) {
client.getInternal(new JobID(), BlobKey.createKey(TRANSIENT_BLOB));

fail("Should throw an exception.");
} catch (Throwable t) {
assertThat(ExceptionUtils.findThrowable(t, java.net.SocketTimeoutException.class).isPresent(), is(true));
}
} finally {
clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, oldSoTimeout);
getBlobServer().setBlockingMillis(0);
}
}

static class TestBlobServer extends BlobServer {

private volatile long blockingMillis = 0;

TestBlobServer(Configuration config, BlobStore blobStore) throws IOException {
super(config, blobStore);
}

@Override
void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, File localFile) throws IOException {
if (blockingMillis > 0) {
try {
Thread.sleep(blockingMillis);
} catch (InterruptedException e) {
throw new IOException(e);
}
}

super.getFileInternal(jobId, blobKey, localFile);
}

void setBlockingMillis(long millis) {
this.blockingMillis = millis;
}
}
}

0 comments on commit de31f49

Please sign in to comment.