Skip to content

Commit

Permalink
[refactor][tests][network] Cleanup PartitionRequestClientFactoryTest
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan authored and AHeise committed Oct 26, 2020
1 parent 086fc01 commit 0184672
Showing 1 changed file with 16 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -131,8 +129,6 @@ public void testNettyClientConnectRetryFailure() throws Exception {

factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));

} catch (Exception e) {
throw e;
} finally {
serverAndClient.client().shutdown();
serverAndClient.server().shutdown();
Expand All @@ -150,24 +146,21 @@ public void testNettyClientConnectRetryMultipleThread() throws Exception {
List<Future<NettyPartitionRequestClient>> futures = new ArrayList<>();

for (int i = 0; i < 10; i++) {
Future<NettyPartitionRequestClient> future = threadPoolExecutor.submit(new Callable<NettyPartitionRequestClient>() {
@Override
public NettyPartitionRequestClient call() {
NettyPartitionRequestClient client = null;
try {
client = factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
} catch (Exception e) {
fail(e.getMessage());
}
return client;
Future<NettyPartitionRequestClient> future = threadPoolExecutor.submit(() -> {
NettyPartitionRequestClient client = null;
try {
client = factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
} catch (Exception e) {
fail(e.getMessage());
}
return client;
});

futures.add(future);
}

futures.forEach(runnableFuture -> {
NettyPartitionRequestClient client = null;
NettyPartitionRequestClient client;
try {
client = runnableFuture.get();
assertNotNull(client);
Expand All @@ -183,7 +176,7 @@ public NettyPartitionRequestClient call() {
}

private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {
NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(
return NettyTestUtil.initServerAndClient(
new NettyProtocol(null, null) {

@Override
Expand All @@ -196,17 +189,15 @@ public ChannelHandler[] getClientChannelHandlers () {
return new ChannelHandler[]{mock(NetworkClientHandler.class)};
}
});

return serverAndClient;
}

private static class UnstableNettyClient extends NettyClient {

private NettyClient nettyClient;
private final NettyClient nettyClient;

private int retry;

public UnstableNettyClient(NettyClient nettyClient, int retry) {
UnstableNettyClient(NettyClient nettyClient, int retry) {
super(null);
this.nettyClient = nettyClient;
this.retry = retry;
Expand All @@ -225,7 +216,7 @@ ChannelFuture connect(final InetSocketAddress serverSocketAddress) {

private static class FailingNettyClient extends NettyClient {

public FailingNettyClient() {
FailingNettyClient() {
super(null);
}

Expand All @@ -235,7 +226,7 @@ ChannelFuture connect(final InetSocketAddress serverSocketAddress) {
}
}

private class AwaitingNettyClient extends NettyClient {
private static class AwaitingNettyClient extends NettyClient {
private volatile boolean awaitForInterrupts;
private final NettyClient client;

Expand Down Expand Up @@ -266,14 +257,14 @@ public CountDownLatchOnConnectHandler(CountDownLatch syncOnConnect) {
}

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
syncOnConnect.countDown();
}
}

private static class UncaughtTestExceptionHandler implements UncaughtExceptionHandler {

private final List<Throwable> errors = new ArrayList<Throwable>(1);
private final List<Throwable> errors = new ArrayList<>(1);

@Override
public void uncaughtException(Thread t, Throwable e) {
Expand Down Expand Up @@ -310,10 +301,6 @@ private static Tuple2<NettyServer, NettyClient> createNettyServerAndClient(Netty
}
}

return new Tuple2<NettyServer, NettyClient>(server, client);
}

private static ConnectionID createServerConnectionID(int connectionIndex) throws UnknownHostException {
return new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), SERVER_PORT), connectionIndex);
return new Tuple2<>(server, client);
}
}

0 comments on commit 0184672

Please sign in to comment.