Skip to content

Commit

Permalink
[FLINK-7975][QS] Wait for QS client to shutdown.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Dec 6, 2017
1 parent 5221a70 commit 5760677
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,33 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}

/** Shuts down the client. */
public void shutdown() {
client.shutdown();
/**
* Shuts down the client and returns a {@link CompletableFuture} that
* will be completed when the shutdown process is completed.
*
* <p>If an exception is thrown for any reason, then the returned future
* will be completed exceptionally with that exception.
*
* @return A {@link CompletableFuture} for further handling of the
* shutdown result.
*/
public CompletableFuture<?> shutdownAndHandle() {
return client.shutdown();
}

/**
* Shuts down the client and waits until shutdown is completed.
*
* <p>If an exception is thrown, a warning is logged containing
* the exception message.
*/
public void shutdownAndWait() {
try {
client.shutdown().get();
LOG.info("The Queryable State Client was shutdown successfully.");
} catch (Exception e) {
LOG.warn("The Queryable State Client shutdown failed: ", e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,19 @@
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -64,6 +68,8 @@
@Internal
public class Client<REQ extends MessageBody, RESP extends MessageBody> {

private static final Logger LOG = LoggerFactory.getLogger(Client.class);

/** The name of the client. Used for logging and stack traces.*/
private final String clientName;

Expand All @@ -82,8 +88,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
/** Pending connections. */
private final Map<InetSocketAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<>();

/** Atomic shut down flag. */
private final AtomicBoolean shutDown = new AtomicBoolean();
/** Atomic shut down future. */
private final AtomicReference<CompletableFuture<Void>> clientShutdownFuture = new AtomicReference<>(null);

/**
* Creates a client with the specified number of event loop threads.
Expand Down Expand Up @@ -133,7 +139,7 @@ public String getClientName() {
}

public CompletableFuture<RESP> sendRequest(final InetSocketAddress serverAddress, final REQ request) {
if (shutDown.get()) {
if (clientShutdownFuture.get() != null) {
return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already shut down."));
}

Expand Down Expand Up @@ -166,28 +172,57 @@ public CompletableFuture<RESP> sendRequest(final InetSocketAddress serverAddress
* Shuts down the client and closes all connections.
*
* <p>After a call to this method, all returned futures will be failed.
*
* @return A {@link CompletableFuture} that will be completed when the shutdown process is done.
*/
public void shutdown() {
if (shutDown.compareAndSet(false, true)) {
public CompletableFuture<Void> shutdown() {
final CompletableFuture<Void> newShutdownFuture = new CompletableFuture<>();
if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) {

final List<CompletableFuture<Void>> connectionFutures = new ArrayList<>();

for (Map.Entry<InetSocketAddress, EstablishedConnection> conn : establishedConnections.entrySet()) {
if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
conn.getValue().close();
connectionFutures.add(conn.getValue().close());
}
}

for (Map.Entry<InetSocketAddress, PendingConnection> conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != null) {
conn.getValue().close();
connectionFutures.add(conn.getValue().close());
}
}

if (bootstrap != null) {
EventLoopGroup group = bootstrap.group();
if (group != null) {
group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
CompletableFuture.allOf(
connectionFutures.toArray(new CompletableFuture<?>[connectionFutures.size()])
).whenComplete((result, throwable) -> {

if (throwable != null) {
LOG.warn("Problem while shutting down the connections at the {}: {}", clientName, throwable);
}
}

if (bootstrap != null) {
EventLoopGroup group = bootstrap.group();
if (group != null && !group.isShutdown()) {
group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
.addListener(finished -> {
if (finished.isSuccess()) {
newShutdownFuture.complete(null);
} else {
newShutdownFuture.completeExceptionally(finished.cause());
}
});
} else {
newShutdownFuture.complete(null);
}
} else {
newShutdownFuture.complete(null);
}
});

return newShutdownFuture;
}
return clientShutdownFuture.get();
}

/**
Expand All @@ -209,8 +244,8 @@ private class PendingConnection implements ChannelFutureListener {
/** The established connection after the connect succeeds. */
private EstablishedConnection established;

/** Closed flag. */
private boolean closed;
/** Atomic shut down future. */
private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture = new AtomicReference<>(null);

/** Failure cause if something goes wrong. */
private Throwable failureCause;
Expand Down Expand Up @@ -250,7 +285,7 @@ public CompletableFuture<RESP> sendRequest(REQ request) {
synchronized (connectLock) {
if (failureCause != null) {
return FutureUtils.getFailedFuture(failureCause);
} else if (closed) {
} else if (connectionShutdownFuture.get() != null) {
return FutureUtils.getFailedFuture(new ClosedChannelException());
} else {
if (established != null) {
Expand All @@ -272,7 +307,7 @@ public CompletableFuture<RESP> sendRequest(REQ request) {
*/
private void handInChannel(Channel channel) {
synchronized (connectLock) {
if (closed || failureCause != null) {
if (connectionShutdownFuture.get() != null || failureCause != null) {
// Close the channel and we are done. Any queued requests
// are removed on the close/failure call and after that no
// new ones can be enqueued.
Expand Down Expand Up @@ -300,7 +335,7 @@ private void handInChannel(Channel channel) {
// Check shut down for possible race with shut down. We
// don't want any lingering connections after shut down,
// which can happen if we don't check this here.
if (shutDown.get()) {
if (clientShutdownFuture.get() != null) {
if (establishedConnections.remove(serverAddress, established)) {
established.close();
}
Expand All @@ -312,32 +347,40 @@ private void handInChannel(Channel channel) {
/**
* Close the connecting channel with a ClosedChannelException.
*/
private void close() {
close(new ClosedChannelException());
private CompletableFuture<Void> close() {
return close(new ClosedChannelException());
}

/**
* Close the connecting channel with an Exception (can be {@code null})
* or forward to the established channel.
*/
private void close(Throwable cause) {
synchronized (connectLock) {
if (!closed) {
private CompletableFuture<Void> close(Throwable cause) {
CompletableFuture<Void> future = new CompletableFuture<>();
if (connectionShutdownFuture.compareAndSet(null, future)) {
synchronized (connectLock) {
if (failureCause == null) {
failureCause = cause;
}

if (established != null) {
established.close();
established.close().whenComplete((result, throwable) -> {
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
future.complete(null);
}
});
} else {
PendingRequest pending;
while ((pending = queuedRequests.poll()) != null) {
pending.completeExceptionally(cause);
}
future.complete(null);
}
closed = true;
}
}
return connectionShutdownFuture.get();
}

@Override
Expand All @@ -347,7 +390,7 @@ public String toString() {
"serverAddress=" + serverAddress +
", queuedRequests=" + queuedRequests.size() +
", established=" + (established != null) +
", closed=" + closed +
", closed=" + (connectionShutdownFuture.get() != null) +
'}';
}
}
Expand Down Expand Up @@ -383,8 +426,8 @@ private class EstablishedConnection implements ClientHandlerCallback<RESP> {
/** Current request number used to assign unique request IDs. */
private final AtomicLong requestCount = new AtomicLong();

/** Reference to a failure that was reported by the channel. */
private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
/** Atomic shut down future. */
private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture = new AtomicReference<>(null);

/**
* Creates an established connection with the given channel.
Expand Down Expand Up @@ -412,8 +455,8 @@ private class EstablishedConnection implements ClientHandlerCallback<RESP> {
/**
* Close the channel with a ClosedChannelException.
*/
void close() {
close(new ClosedChannelException());
CompletableFuture<Void> close() {
return close(new ClosedChannelException());
}

/**
Expand All @@ -422,20 +465,33 @@ void close() {
* @param cause The cause to close the channel with.
* @return Channel close future
*/
private boolean close(Throwable cause) {
if (failureCause.compareAndSet(null, cause)) {
channel.close();
stats.reportInactiveConnection();
private CompletableFuture<Void> close(final Throwable cause) {
final CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();

for (long requestId : pendingRequests.keySet()) {
TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
if (pending != null && pending.completeExceptionally(cause)) {
stats.reportFailedRequest();
if (connectionShutdownFuture.compareAndSet(null, shutdownFuture)) {
channel.close().addListener(finished -> {
stats.reportInactiveConnection();
for (long requestId : pendingRequests.keySet()) {
TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
if (pending != null && pending.completeExceptionally(cause)) {
stats.reportFailedRequest();
}
}
}
return true;

// when finishing, if netty successfully closes the channel, then the provided exception is used
// as the reason for the closing. If there was something wrong at the netty side, then that exception
// is prioritized over the provided one.
if (finished.isSuccess()) {
shutdownFuture.completeExceptionally(cause);
} else {
LOG.warn("Something went wrong when trying to close connection due to : ", cause);
shutdownFuture.completeExceptionally(finished.cause());
}
});
}
return false;

// in case we had a race condition, return the winner of the race.
return connectionShutdownFuture.get();
}

/**
Expand Down Expand Up @@ -464,16 +520,22 @@ CompletableFuture<RESP> sendRequest(REQ request) {
}
});

// Check failure for possible race. We don't want any lingering
// Check for possible race. We don't want any lingering
// promises after a failure, which can happen if we don't check
// this here. Note that close is treated as a failure as well.
Throwable failure = failureCause.get();
if (failure != null) {
// Remove from pending requests to guard against concurrent
// removal and to make sure that we only count it once as failed.
CompletableFuture<Void> clShutdownFuture = clientShutdownFuture.get();
if (clShutdownFuture != null) {
TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
if (pending != null && pending.completeExceptionally(failure)) {
stats.reportFailedRequest();
if (pending != null) {
clShutdownFuture.whenComplete((ignored, throwable) -> {
if (throwable != null && pending.completeExceptionally(throwable)) {
stats.reportFailedRequest();
} else {
// the shutdown future is always completed exceptionally so we should not arrive here.
// but in any case, we complete the pending connection request exceptionally.
pending.completeExceptionally(new ClosedChannelException());
}
});
}
}
} catch (Throwable t) {
Expand All @@ -486,27 +548,25 @@ CompletableFuture<RESP> sendRequest(REQ request) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
if (pending != null && pending.complete(response)) {
if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
pending.complete(response);
}
}

@Override
public void onRequestFailure(long requestId, Throwable cause) {
TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
if (pending != null && pending.completeExceptionally(cause)) {
if (pending != null && !pending.isDone()) {
stats.reportFailedRequest();
pending.completeExceptionally(cause);
}
}

@Override
public void onFailure(Throwable cause) {
if (close(cause)) {
// Remove from established channels, otherwise future
// requests will be handled by this failed channel.
establishedConnections.remove(serverAddress, this);
}
close(cause).handle((cancelled, ignored) -> establishedConnections.remove(serverAddress, this));
}

@Override
Expand All @@ -516,7 +576,6 @@ public String toString() {
", channel=" + channel +
", pendingRequests=" + pendingRequests.size() +
", requestCount=" + requestCount +
", failureCause=" + failureCause +
'}';
}

Expand Down
Loading

0 comments on commit 5760677

Please sign in to comment.