Skip to content

Commit

Permalink
[FLINK-7533] Let LeaderGatewayRetriever retry failed gateway retrievals
Browse files Browse the repository at this point in the history
Add test case

Only log LeaderGatewayRetriever exception on Debug log level

Properly fail outdated gateway retrieval operations

This closes apache#4602.
  • Loading branch information
tillrohrmann committed Sep 18, 2017
1 parent 41dba8b commit 51b48f3
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
* @param <T> type of the optional
*/
public class OptionalConsumer<T> {
private Optional<T> optional;
private final Optional<T> optional;

private OptionalConsumer(Optional<T> optional) {
this.optional = optional;
this.optional = Preconditions.checkNotNull(optional);
}

public static <T> OptionalConsumer<T> of(Optional<T> optional) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
public interface WebHandler {

/**
* Paths to register the handler under.
* Returns an array of REST URL's under which this handler can be registered.
*
* @return Array of paths under which the handler wants to be registered
* @return array containing REST URL's under which this handler can be registered.
*/
String[] getPaths();
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public TaskManagerLogHandler(
this.blobView = Preconditions.checkNotNull(blobView, "blobView");
}

@Override
public String[] getPaths() {
switch (fileMode) {
case LOG:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.retriever;

import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.util.FlinkRuntimeException;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -38,19 +39,23 @@ public interface GatewayRetriever<T extends RpcGateway> {
CompletableFuture<T> getFuture();

/**
* Returns the currently retrieved object if there is such an object. Otherwise
* Returns the currently retrieved gateway if there is such an object. Otherwise
* it returns an empty optional.
*
* @return Optional object to retrieve
* @throws Exception if the future has been completed with an exception
*/
default Optional<T> getNow() throws Exception {
default Optional<T> getNow() {
CompletableFuture<T> leaderFuture = getFuture();
if (leaderFuture != null) {
CompletableFuture<T> currentLeaderFuture = leaderFuture;

if (currentLeaderFuture.isDone()) {
return Optional.of(currentLeaderFuture.get());
if (leaderFuture.isCompletedExceptionally() || leaderFuture.isCancelled()) {
return Optional.empty();
} else if (leaderFuture.isDone()) {
try {
return Optional.of(leaderFuture.get());
} catch (Exception e) {
// this should never happen
throw new FlinkRuntimeException("Unexpected error while accessing the retrieved gateway.", e);
}
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
package org.apache.flink.runtime.webmonitor.retriever;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.util.ExceptionUtils;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;

/**
* Retrieves and stores the leading {@link RpcGateway}.
Expand All @@ -31,24 +35,86 @@
*/
public abstract class LeaderGatewayRetriever<T extends RpcGateway> extends LeaderRetriever implements GatewayRetriever<T> {

private volatile CompletableFuture<T> gatewayFuture;
private final AtomicReference<CompletableFuture<T>> atomicGatewayFuture;

private volatile CompletableFuture<T> initialGatewayFuture;

public LeaderGatewayRetriever() {
gatewayFuture = createGateway(getLeaderFuture());
initialGatewayFuture = new CompletableFuture<>();
atomicGatewayFuture = new AtomicReference<>(initialGatewayFuture);
}

@Override
public CompletableFuture<T> getFuture() {
return gatewayFuture;
final CompletableFuture<T> currentGatewayFuture = atomicGatewayFuture.get();

if (currentGatewayFuture.isCompletedExceptionally()) {
try {
currentGatewayFuture.get();
} catch (ExecutionException | InterruptedException executionException) {
String leaderAddress;

try {
Tuple2<String, UUID> leaderAddressSessionId = getLeaderNow()
.orElse(Tuple2.of("unknown address", HighAvailabilityServices.DEFAULT_LEADER_ID));

leaderAddress = leaderAddressSessionId.f0;
} catch (Exception e) {
log.warn("Could not obtain the current leader.", e);
leaderAddress = "unknown leader address";
}

if (log.isDebugEnabled() || log.isTraceEnabled()) {
// only log exceptions on debug or trace level
log.warn(
"Error while retrieving the leader gateway. Retrying to connect to {}.",
leaderAddress,
ExceptionUtils.stripExecutionException(executionException));
} else {
log.warn(
"Error while retrieving the leader gateway. Retrying to connect to {}.",
leaderAddress);
}
}

// we couldn't resolve the gateway --> let's try again
final CompletableFuture<T> newGatewayFuture = createGateway(getLeaderFuture());

// let's check if there was a concurrent createNewFuture call
if (atomicGatewayFuture.compareAndSet(currentGatewayFuture, newGatewayFuture)) {
return newGatewayFuture;
} else {
return atomicGatewayFuture.get();
}
} else {
return atomicGatewayFuture.get();
}
}

@Override
public CompletableFuture<Tuple2<String, UUID>> createNewFuture() {
CompletableFuture<Tuple2<String, UUID>> newFuture = super.createNewFuture();
public void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {
final CompletableFuture<T> newGatewayFuture = createGateway(newLeaderAddressFuture);

final CompletableFuture<T> oldGatewayFuture = atomicGatewayFuture.getAndSet(newGatewayFuture);

gatewayFuture = createGateway(newFuture);
// check if the old gateway future was the initial future
if (oldGatewayFuture == initialGatewayFuture) {
// we have to complete it because a caller might wait on the initial future
newGatewayFuture.whenComplete(
(t, throwable) -> {
if (throwable != null) {
oldGatewayFuture.completeExceptionally(throwable);
} else {
oldGatewayFuture.complete(t);
}
});

return newFuture;
// free the initial gateway future
initialGatewayFuture = null;
} else {
// try to cancel old gateway retrieval operation
oldGatewayFuture.cancel(false);
}
}

protected abstract CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,18 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* Retrieves and stores the current leader address.
*/
public class LeaderRetriever implements LeaderRetrievalListener {
protected final Logger log = LoggerFactory.getLogger(getClass());

// False if we have to create a new JobManagerGateway future when being notified
// about a new leader address
private final AtomicBoolean firstTimeUsage;

protected volatile CompletableFuture<Tuple2<String, UUID>> leaderFuture;
private AtomicReference<CompletableFuture<Tuple2<String, UUID>>> atomicLeaderFuture;

public LeaderRetriever() {
firstTimeUsage = new AtomicBoolean(true);
leaderFuture = new CompletableFuture<>();
atomicLeaderFuture = new AtomicReference<>(new CompletableFuture<>());
}

/**
Expand All @@ -55,7 +50,7 @@ public LeaderRetriever() {
* @throws Exception if the leader future has been completed with an exception
*/
public Optional<Tuple2<String, UUID>> getLeaderNow() throws Exception {
CompletableFuture<Tuple2<String, UUID>> leaderFuture = this.leaderFuture;
CompletableFuture<Tuple2<String, UUID>> leaderFuture = this.atomicLeaderFuture.get();
if (leaderFuture != null) {
CompletableFuture<Tuple2<String, UUID>> currentLeaderFuture = leaderFuture;

Expand All @@ -73,25 +68,23 @@ public Optional<Tuple2<String, UUID>> getLeaderNow() throws Exception {
* Returns the current JobManagerGateway future.
*/
public CompletableFuture<Tuple2<String, UUID>> getLeaderFuture() {
return leaderFuture;
return atomicLeaderFuture.get();
}

@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
if (leaderAddress != null && !leaderAddress.equals("")) {
try {
final CompletableFuture<Tuple2<String, UUID>> newLeaderFuture;
final CompletableFuture<Tuple2<String, UUID>> newLeaderFuture = CompletableFuture.completedFuture(Tuple2.of(leaderAddress, leaderSessionID));

if (firstTimeUsage.compareAndSet(true, false)) {
newLeaderFuture = leaderFuture;
} else {
newLeaderFuture = createNewFuture();
leaderFuture = newLeaderFuture;
}
final CompletableFuture<Tuple2<String, UUID>> oldLeaderFuture = atomicLeaderFuture.getAndSet(newLeaderFuture);

log.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
if (!oldLeaderFuture.isDone()) {
// initial leader future
oldLeaderFuture.complete(Tuple2.of(leaderAddress, leaderSessionID));
}

newLeaderFuture.complete(Tuple2.of(leaderAddress, leaderSessionID));
notifyNewLeaderAddress(newLeaderFuture);
}
catch (Exception e) {
handleError(e);
Expand All @@ -103,10 +96,8 @@ public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSes
public void handleError(Exception exception) {
log.error("Received error from LeaderRetrievalService.", exception);

leaderFuture.completeExceptionally(exception);
atomicLeaderFuture.get().completeExceptionally(exception);
}

protected CompletableFuture<Tuple2<String, UUID>> createNewFuture() {
return new CompletableFuture<>();
}
protected void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.retriever;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

/**
* Test cases for the {@link LeaderGatewayRetriever}.
*/
public class LeaderGatewayRetrieverTest extends TestLogger {

/**
* Tests that the gateway retrieval is retried in case of a failure.
*/
@Test
public void testGatewayRetrievalFailures() throws Exception {
final String address = "localhost";
final UUID leaderId = UUID.randomUUID();

RpcGateway rpcGateway = mock(RpcGateway.class);

TestingLeaderGatewayRetriever leaderGatewayRetriever = new TestingLeaderGatewayRetriever(rpcGateway);
TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();

testingLeaderRetrievalService.start(leaderGatewayRetriever);

CompletableFuture<RpcGateway> gatewayFuture = leaderGatewayRetriever.getFuture();

// this triggers the first gateway retrieval attempt
testingLeaderRetrievalService.notifyListener(address, leaderId);

// check that the first future has been failed
try {
gatewayFuture.get();

fail("The first future should have been failed.");
} catch (ExecutionException ignored) {
// that's what we expect
}

// the second attempt should fail as well
assertFalse((leaderGatewayRetriever.getNow().isPresent()));

// the third attempt should succeed
assertEquals(rpcGateway, leaderGatewayRetriever.getNow().get());
}

private static class TestingLeaderGatewayRetriever extends LeaderGatewayRetriever<RpcGateway> {

private final RpcGateway rpcGateway;
private int retrievalAttempt = 0;

private TestingLeaderGatewayRetriever(RpcGateway rpcGateway) {
this.rpcGateway = rpcGateway;
}

@Override
protected CompletableFuture<RpcGateway> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
CompletableFuture<RpcGateway> result;

if (retrievalAttempt < 2) {
result = FutureUtils.completedExceptionally(new FlinkException("Could not resolve the leader gateway."));
} else {
result = CompletableFuture.completedFuture(rpcGateway);
}

retrievalAttempt++;

return result;
}
}
}

0 comments on commit 51b48f3

Please sign in to comment.