Skip to content

Commit

Permalink
[FLINK-10580] Harden BootstrapTool#startActorSystem
Browse files Browse the repository at this point in the history
Instead of opening a socket to check whether a given port is free we simply start an
ActorSystem with it an check whether it can bind to this port. This also solves the problem
that a 0 port get resolved to a specific port which might get taken between closing the
test socket and starting the ActorSystem.
  • Loading branch information
tillrohrmann committed Oct 18, 2018
1 parent 4be1afa commit 7dbe4bb
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.net.BindException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -157,19 +156,7 @@ public static ActorSystem startActorSystem(
}

while (portsIterator.hasNext()) {
// first, we check if the port is available by opening a socket
// if the actor system fails to start on the port, we try further
ServerSocket availableSocket = NetUtils.createSocketFromPorts(portsIterator, ServerSocket::new);

int port;
if (availableSocket == null) {
throw new BindException("Unable to allocate further port in port range: " + portRangeDefinition);
} else {
port = availableSocket.getLocalPort();
try {
availableSocket.close();
} catch (IOException ignored) {}
}
final int port = portsIterator.next();

try {
return startActorSystem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,26 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.CheckedSupplier;

import akka.actor.ActorSystem;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -33,7 +49,9 @@
/**
* Tests for {@link BootstrapToolsTest}.
*/
public class BootstrapToolsTest {
public class BootstrapToolsTest extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(BootstrapToolsTest.class);

@Test
public void testSubstituteConfigKey() {
Expand Down Expand Up @@ -305,4 +323,41 @@ public void testShouldNotUpdateTmpDirectoriesInConfigurationIfNoValueConfigured(
BootstrapTools.updateTmpDirectoriesInConfiguration(config, null);
assertEquals(config.getString(CoreOptions.TMP_DIRS), CoreOptions.TMP_DIRS.defaultValue());
}

/**
* Tests that we can concurrently create two {@link ActorSystem} without port conflicts.
* This effectively tests that we don't open a socket to check for a ports availability.
* See FLINK-10580 for more details.
*/
@Test
public void testConcurrentActorSystemCreation() throws Exception {
final int concurrentCreations = 10;
final ExecutorService executorService = Executors.newFixedThreadPool(concurrentCreations);
final CyclicBarrier cyclicBarrier = new CyclicBarrier(concurrentCreations);

try {
final List<CompletableFuture<Void>> actorSystemFutures = IntStream.range(0, concurrentCreations)
.mapToObj(
ignored ->
CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(() -> {
cyclicBarrier.await();

return BootstrapTools.startActorSystem(
new Configuration(),
"localhost",
"0",
LOG);
})))
.map(
// terminate ActorSystems
actorSystemFuture ->
actorSystemFuture.thenCompose(AkkaUtils::terminateActorSystem)
).collect(Collectors.toList());

FutureUtils.completeAll(actorSystemFutures).get();
} finally {
ExecutorUtils.gracefulShutdown(10000L, TimeUnit.MILLISECONDS, executorService);
}
}
}

0 comments on commit 7dbe4bb

Please sign in to comment.