Skip to content

Commit

Permalink
[FLINK-7580] Automatically retry failed gateway retrievals
Browse files Browse the repository at this point in the history
The LeaderGatewayRetriever implementations, AkkaJobManagerRetriever and the
RpcGatewayRetriever, now automatically retry the gateway retrieval operation
for a fixed number of times with a retry delay before completing the gateway
future with an exception.

Retry AkkaJobManagerRetriever

Retry RpcGatewayRetriever

Add support for fenced components

This closes apache#4643.
  • Loading branch information
tillrohrmann committed Sep 18, 2017
1 parent 51b48f3 commit 51d9a74
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
webMonitor = BootstrapTools.startWebMonitorIfConfigured(
config,
highAvailabilityServices,
new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout),
new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)),
new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
webMonitorTimeout,
futureExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void testRedirectToLeader() throws Exception {
}

for (int i = 0; i < webMonitor.length; i++) {
jobManagerRetrievers[i] = new AkkaJobManagerRetriever(jobManagerSystem[i], TIMEOUT);
jobManagerRetrievers[i] = new AkkaJobManagerRetriever(jobManagerSystem[i], TIMEOUT, 0, Time.milliseconds(50L));

webMonitor[i] = new WebRuntimeMonitor(
config,
Expand Down Expand Up @@ -315,7 +315,7 @@ public void testLeaderNotAvailable() throws Exception {
config,
mock(LeaderRetrievalService.class),
mock(BlobView.class),
new AkkaJobManagerRetriever(actorSystem, TIMEOUT),
new AkkaJobManagerRetriever(actorSystem, TIMEOUT, 0, Time.milliseconds(50L)),
new AkkaQueryServiceRetriever(actorSystem, TIMEOUT),
TIMEOUT,
TestingUtils.defaultExecutor());
Expand Down Expand Up @@ -496,7 +496,7 @@ private WebRuntimeMonitor startWebRuntimeMonitor(
config,
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
highAvailabilityServices.createBlobStore(),
new AkkaJobManagerRetriever(jmActorSystem, timeout),
new AkkaJobManagerRetriever(jmActorSystem, timeout, 0, Time.milliseconds(50L)),
new AkkaQueryServiceRetriever(jmActorSystem, timeout),
timeout,
TestingUtils.defaultExecutor());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.concurrent.akka;

import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.util.Preconditions;

import akka.actor.ActorSystem;
import akka.actor.Cancellable;

import javax.annotation.Nonnull;

import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.FiniteDuration;

/**
* Adapter to use a {@link ActorSystem} as a {@link ScheduledExecutor}.
*/
public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {

private final ActorSystem actorSystem;

public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) {
this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService");
}

@Override
@Nonnull
public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L);

Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

scheduledFutureTask.setCancellable(cancellable);

return scheduledFutureTask;
}

@Override
@Nonnull
public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L);

Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

scheduledFutureTask.setCancellable(cancellable);

return scheduledFutureTask;
}

@Override
@Nonnull
public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
command,
triggerTime(unit.toNanos(initialDelay)),
unit.toNanos(period));

Cancellable cancellable = actorSystem.scheduler().schedule(
new FiniteDuration(initialDelay, unit),
new FiniteDuration(period, unit),
scheduledFutureTask,
actorSystem.dispatcher());

scheduledFutureTask.setCancellable(cancellable);

return scheduledFutureTask;
}

@Override
@Nonnull
public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
command,
triggerTime(unit.toNanos(initialDelay)),
unit.toNanos(-delay));

Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit);

scheduledFutureTask.setCancellable(cancellable);

return scheduledFutureTask;
}

@Override
public void execute(@Nonnull Runnable command) {
actorSystem.dispatcher().execute(command);
}

private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
return actorSystem.scheduler().scheduleOnce(
new FiniteDuration(delay, unit),
runnable,
actorSystem.dispatcher());
}

private long now() {
return System.nanoTime();
}

private long triggerTime(long delay) {
return now() + delay;
}

private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

private long time;

private final long period;

private volatile Cancellable cancellable;

ScheduledFutureTask(Callable<V> callable, long time, long period) {
super(callable);
this.time = time;
this.period = period;
}

ScheduledFutureTask(Runnable runnable, long time, long period) {
super(runnable, null);
this.time = time;
this.period = period;
}

public void setCancellable(Cancellable newCancellable) {
this.cancellable = newCancellable;
}

@Override
public void run() {
if (!isPeriodic()) {
super.run();
} else if (runAndReset()){
if (period > 0L) {
time += period;
} else {
cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS);

// check whether we have been cancelled concurrently
if (isCancelled()) {
cancellable.cancel();
} else {
time = triggerTime(-period);
}
}
}
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = super.cancel(mayInterruptIfRunning);

return result && cancellable.cancel();
}

@Override
public long getDelay(@Nonnull TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(@Nonnull Delayed o) {
if (o == this) {
return 0;
}

long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0;
}

@Override
public boolean isPeriodic() {
return period != 0L;
}
}
}
Loading

0 comments on commit 51d9a74

Please sign in to comment.