From 51d9a748d40f29b197b2d77fe8aa1b2439737da3 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 4 Sep 2017 17:57:08 +0200 Subject: [PATCH] [FLINK-7580] Automatically retry failed gateway retrievals The LeaderGatewayRetriever implementations, AkkaJobManagerRetriever and the RpcGatewayRetriever, now automatically retry the gateway retrieval operation for a fixed number of times with a retry delay before completing the gateway future with an exception. Retry AkkaJobManagerRetriever Retry RpcGatewayRetriever Add support for fenced components This closes #4643. --- .../MesosApplicationMasterRunner.java | 2 +- .../webmonitor/WebRuntimeMonitorITCase.java | 6 +- .../ActorSystemScheduledExecutorAdapter.java | 198 +++++++++++++++++ .../runtime/rpc/akka/AkkaRpcService.java | 200 ++---------------- .../impl/AkkaJobManagerRetriever.java | 52 +++-- .../retriever/impl/RpcGatewayRetriever.java | 41 +++- .../flink/runtime/jobmanager/JobManager.scala | 2 +- .../minicluster/FlinkMiniCluster.scala | 2 +- .../impl/AkkaJobManagerRetrieverTest.java | 3 +- .../impl/RpcGatewayRetrieverTest.java | 13 +- .../yarn/YarnApplicationMasterRunner.java | 2 +- 11 files changed, 305 insertions(+), 216 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index c0a68559501a9..591b7f302b12e 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -305,7 +305,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie webMonitor = BootstrapTools.startWebMonitorIfConfigured( config, highAvailabilityServices, - new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout), + new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), webMonitorTimeout, futureExecutor, diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index 10b5ced8823c8..59f8b9d3e8196 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -166,7 +166,7 @@ public void testRedirectToLeader() throws Exception { } for (int i = 0; i < webMonitor.length; i++) { - jobManagerRetrievers[i] = new AkkaJobManagerRetriever(jobManagerSystem[i], TIMEOUT); + jobManagerRetrievers[i] = new AkkaJobManagerRetriever(jobManagerSystem[i], TIMEOUT, 0, Time.milliseconds(50L)); webMonitor[i] = new WebRuntimeMonitor( config, @@ -315,7 +315,7 @@ public void testLeaderNotAvailable() throws Exception { config, mock(LeaderRetrievalService.class), mock(BlobView.class), - new AkkaJobManagerRetriever(actorSystem, TIMEOUT), + new AkkaJobManagerRetriever(actorSystem, TIMEOUT, 0, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, TIMEOUT), TIMEOUT, TestingUtils.defaultExecutor()); @@ -496,7 +496,7 @@ private WebRuntimeMonitor startWebRuntimeMonitor( config, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), highAvailabilityServices.createBlobStore(), - new AkkaJobManagerRetriever(jmActorSystem, timeout), + new AkkaJobManagerRetriever(jmActorSystem, timeout, 0, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(jmActorSystem, timeout), timeout, TestingUtils.defaultExecutor()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java new file mode 100644 index 0000000000000..918f84bf99a12 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent.akka; + +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.util.Preconditions; + +import akka.actor.ActorSystem; +import akka.actor.Cancellable; + +import javax.annotation.Nonnull; + +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.FiniteDuration; + +/** + * Adapter to use a {@link ActorSystem} as a {@link ScheduledExecutor}. + */ +public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor { + + private final ActorSystem actorSystem; + + public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) { + this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService"); + } + + @Override + @Nonnull + public ScheduledFuture schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) { + ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L); + + Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); + + scheduledFutureTask.setCancellable(cancellable); + + return scheduledFutureTask; + } + + @Override + @Nonnull + public ScheduledFuture schedule(@Nonnull Callable callable, long delay, @Nonnull TimeUnit unit) { + ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L); + + Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); + + scheduledFutureTask.setCancellable(cancellable); + + return scheduledFutureTask; + } + + @Override + @Nonnull + public ScheduledFuture scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) { + ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask<>( + command, + triggerTime(unit.toNanos(initialDelay)), + unit.toNanos(period)); + + Cancellable cancellable = actorSystem.scheduler().schedule( + new FiniteDuration(initialDelay, unit), + new FiniteDuration(period, unit), + scheduledFutureTask, + actorSystem.dispatcher()); + + scheduledFutureTask.setCancellable(cancellable); + + return scheduledFutureTask; + } + + @Override + @Nonnull + public ScheduledFuture scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) { + ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask<>( + command, + triggerTime(unit.toNanos(initialDelay)), + unit.toNanos(-delay)); + + Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit); + + scheduledFutureTask.setCancellable(cancellable); + + return scheduledFutureTask; + } + + @Override + public void execute(@Nonnull Runnable command) { + actorSystem.dispatcher().execute(command); + } + + private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) { + return actorSystem.scheduler().scheduleOnce( + new FiniteDuration(delay, unit), + runnable, + actorSystem.dispatcher()); + } + + private long now() { + return System.nanoTime(); + } + + private long triggerTime(long delay) { + return now() + delay; + } + + private final class ScheduledFutureTask extends FutureTask implements RunnableScheduledFuture { + + private long time; + + private final long period; + + private volatile Cancellable cancellable; + + ScheduledFutureTask(Callable callable, long time, long period) { + super(callable); + this.time = time; + this.period = period; + } + + ScheduledFutureTask(Runnable runnable, long time, long period) { + super(runnable, null); + this.time = time; + this.period = period; + } + + public void setCancellable(Cancellable newCancellable) { + this.cancellable = newCancellable; + } + + @Override + public void run() { + if (!isPeriodic()) { + super.run(); + } else if (runAndReset()){ + if (period > 0L) { + time += period; + } else { + cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS); + + // check whether we have been cancelled concurrently + if (isCancelled()) { + cancellable.cancel(); + } else { + time = triggerTime(-period); + } + } + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean result = super.cancel(mayInterruptIfRunning); + + return result && cancellable.cancel(); + } + + @Override + public long getDelay(@Nonnull TimeUnit unit) { + return unit.convert(time - now(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(@Nonnull Delayed o) { + if (o == this) { + return 0; + } + + long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); + return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0; + } + + @Override + public boolean isPeriodic() { + return period != 0L; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 07b334d8d604e..259f0ee45dd5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -18,39 +18,35 @@ package org.apache.flink.runtime.rpc.akka; -import akka.actor.ActorIdentity; -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; -import akka.actor.Address; -import akka.actor.Cancellable; -import akka.actor.Identify; -import akka.actor.Props; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; -import akka.pattern.Patterns; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter; import org.apache.flink.runtime.rpc.FencedMainThreadExecutable; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServer; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; -import org.apache.flink.util.Preconditions; + +import akka.actor.ActorIdentity; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.Identify; +import akka.actor.Props; +import akka.dispatch.Futures; +import akka.dispatch.Mapper; +import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; -import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import java.io.Serializable; @@ -60,14 +56,14 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Delayed; import java.util.concurrent.Executor; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RunnableScheduledFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import scala.Option; +import scala.concurrent.Future; + import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -122,7 +118,7 @@ public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { port = -1; } - internalScheduledExecutor = new InternalScheduledExecutorImpl(actorSystem); + internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem); } @Override @@ -420,166 +416,4 @@ public C checkedApply(Object obj) throws Exception { return FutureUtils.toJava(resultFuture); } - - /** - * Helper class to expose the internal scheduling logic via a {@link ScheduledExecutor}. - */ - private static final class InternalScheduledExecutorImpl implements ScheduledExecutor { - - private final ActorSystem actorSystem; - - private InternalScheduledExecutorImpl(ActorSystem actorSystem) { - this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService"); - } - - @Override - @Nonnull - public ScheduledFuture schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) { - ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L); - - Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); - - scheduledFutureTask.setCancellable(cancellable); - - return scheduledFutureTask; - } - - @Override - @Nonnull - public ScheduledFuture schedule(@Nonnull Callable callable, long delay, @Nonnull TimeUnit unit) { - ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L); - - Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); - - scheduledFutureTask.setCancellable(cancellable); - - return scheduledFutureTask; - } - - @Override - @Nonnull - public ScheduledFuture scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) { - ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask<>( - command, - triggerTime(unit.toNanos(initialDelay)), - unit.toNanos(period)); - - Cancellable cancellable = actorSystem.scheduler().schedule( - new FiniteDuration(initialDelay, unit), - new FiniteDuration(period, unit), - scheduledFutureTask, - actorSystem.dispatcher()); - - scheduledFutureTask.setCancellable(cancellable); - - return scheduledFutureTask; - } - - @Override - @Nonnull - public ScheduledFuture scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) { - ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask<>( - command, - triggerTime(unit.toNanos(initialDelay)), - unit.toNanos(-delay)); - - Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit); - - scheduledFutureTask.setCancellable(cancellable); - - return scheduledFutureTask; - } - - @Override - public void execute(@Nonnull Runnable command) { - actorSystem.dispatcher().execute(command); - } - - private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) { - return actorSystem.scheduler().scheduleOnce( - new FiniteDuration(delay, unit), - runnable, - actorSystem.dispatcher()); - } - - private long now() { - return System.nanoTime(); - } - - private long triggerTime(long delay) { - return now() + delay; - } - - private final class ScheduledFutureTask extends FutureTask implements RunnableScheduledFuture { - - private long time; - - private final long period; - - private volatile Cancellable cancellable; - - ScheduledFutureTask(Callable callable, long time, long period) { - super(callable); - this.time = time; - this.period = period; - } - - ScheduledFutureTask(Runnable runnable, long time, long period) { - super(runnable, null); - this.time = time; - this.period = period; - } - - public void setCancellable(Cancellable newCancellable) { - this.cancellable = newCancellable; - } - - @Override - public void run() { - if (!isPeriodic()) { - super.run(); - } else if (runAndReset()){ - if (period > 0L) { - time += period; - } else { - cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS); - - // check whether we have been cancelled concurrently - if (isCancelled()) { - cancellable.cancel(); - } else { - time = triggerTime(-period); - } - } - } - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean result = super.cancel(mayInterruptIfRunning); - - return result && cancellable.cancel(); - } - - @Override - public long getDelay(@Nonnull TimeUnit unit) { - return unit.convert(time - now(), TimeUnit.NANOSECONDS); - } - - @Override - public int compareTo(@Nonnull Delayed o) { - if (o == this) { - return 0; - } - - long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); - return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0; - } - - @Override - public boolean isPeriodic() { - return period != 0L; - } - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java index 121387b689e8c..1702a041aec2b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.akka.AkkaJobManagerGateway; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobmaster.JobManagerGateway; @@ -42,31 +44,51 @@ public class AkkaJobManagerRetriever extends LeaderGatewayRetriever= 0, "The number of retries must be >= 0."); + this.retries = retries; + + this.retryDelay = Preconditions.checkNotNull(retryDelay); + + this.scheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem); } @Override protected CompletableFuture createGateway(CompletableFuture> leaderFuture) { - return leaderFuture.thenCompose( - (Tuple2 addressLeaderId) -> - FutureUtils.toJava( - AkkaUtils.getActorRefFuture( - addressLeaderId.f0, - actorSystem, - FutureUtils.toFiniteDuration(timeout))) - .thenApplyAsync( - (ActorRef jobManagerRef) -> { - ActorGateway leaderGateway = new AkkaActorGateway( - jobManagerRef, addressLeaderId.f1); - return new AkkaJobManagerGateway(leaderGateway); - } - )); + return FutureUtils.retryWithDelay( + () -> + leaderFuture.thenCompose( + (Tuple2 addressLeaderId) -> + FutureUtils.toJava( + AkkaUtils.getActorRefFuture( + addressLeaderId.f0, + actorSystem, + FutureUtils.toFiniteDuration(timeout))) + .thenApply( + (ActorRef jobManagerRef) -> { + ActorGateway leaderGateway = new AkkaActorGateway( + jobManagerRef, + addressLeaderId.f1); + + return new AkkaJobManagerGateway(leaderGateway); + } + )), + retries, + retryDelay, + scheduledExecutor); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java index 86afc6313d818..6feb919c32ce8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java @@ -18,33 +18,62 @@ package org.apache.flink.runtime.webmonitor.retriever.impl; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.util.Preconditions; +import java.io.Serializable; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; /** * {@link LeaderGatewayRetriever} implementation using the {@link RpcService}. * - * @param type of the gateway to retrieve + * @param type of the fencing token + * @param type of the fenced gateway to retrieve */ -public class RpcGatewayRetriever extends LeaderGatewayRetriever { +public class RpcGatewayRetriever> extends LeaderGatewayRetriever { private final RpcService rpcService; private final Class gatewayType; + private final Function fencingTokenMapper; - public RpcGatewayRetriever(RpcService rpcService, Class gatewayType) { + private final int retries; + private final Time retryDelay; + + public RpcGatewayRetriever( + RpcService rpcService, + Class gatewayType, + Function fencingTokenMapper, + int retries, + Time retryDelay) { this.rpcService = Preconditions.checkNotNull(rpcService); + this.gatewayType = Preconditions.checkNotNull(gatewayType); + this.fencingTokenMapper = Preconditions.checkNotNull(fencingTokenMapper); + + Preconditions.checkArgument(retries >= 0, "The number of retries must be greater or equal to 0."); + this.retries = retries; + this.retryDelay = Preconditions.checkNotNull(retryDelay); } @Override protected CompletableFuture createGateway(CompletableFuture> leaderFuture) { - return leaderFuture.thenCompose( - (Tuple2 addressLeaderTuple) -> rpcService.connect(addressLeaderTuple.f0, gatewayType)); + return FutureUtils.retryWithDelay( + () -> + leaderFuture.thenCompose( + (Tuple2 addressLeaderTuple) -> + rpcService.connect( + addressLeaderTuple.f0, + fencingTokenMapper.apply(addressLeaderTuple.f1), + gatewayType)), + retries, + retryDelay, + rpcService.getScheduledExecutor()); } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index f0073db4dca53..a3e1f0809a76f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2229,7 +2229,7 @@ object JobManager { val webServer = WebMonitorUtils.startWebRuntimeMonitor( configuration, highAvailabilityServices, - new AkkaJobManagerRetriever(jobManagerSystem, timeout), + new AkkaJobManagerRetriever(jobManagerSystem, timeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(jobManagerSystem, timeout), timeout, futureExecutor) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 831c02641d90c..c6cbb84e517b3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -399,7 +399,7 @@ abstract class FlinkMiniCluster( WebMonitorUtils.startWebRuntimeMonitor( config, highAvailabilityServices, - new AkkaJobManagerRetriever(actorSystem, flinkTimeout), + new AkkaJobManagerRetriever(actorSystem, flinkTimeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, flinkTimeout), flinkTimeout, actorSystem.dispatcher) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java index d02f3ef25a683..5d01087a10990 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java @@ -69,7 +69,7 @@ public static void teardown() { */ @Test public void testAkkaJobManagerRetrieval() throws Exception { - AkkaJobManagerRetriever akkaJobManagerRetriever = new AkkaJobManagerRetriever(actorSystem, timeout); + AkkaJobManagerRetriever akkaJobManagerRetriever = new AkkaJobManagerRetriever(actorSystem, timeout, 0, Time.milliseconds(0L)); TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); CompletableFuture gatewayFuture = akkaJobManagerRetriever.getFuture(); @@ -101,5 +101,4 @@ public void testAkkaJobManagerRetrieval() throws Exception { } } } - } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java index 1ca3918eb9973..847250cf29e1e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java @@ -19,9 +19,10 @@ package org.apache.flink.runtime.webmonitor.retriever.impl; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.TestingRpcService; @@ -36,6 +37,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -72,7 +74,7 @@ public void testRpcGatewayRetrieval() throws Exception { final String expectedValue2 = "barfoo"; final UUID leaderSessionId = UUID.randomUUID(); - RpcGatewayRetriever gatewayRetriever = new RpcGatewayRetriever<>(rpcService, DummyGateway.class); + RpcGatewayRetriever gatewayRetriever = new RpcGatewayRetriever<>(rpcService, DummyGateway.class, Function.identity(), 0, Time.milliseconds(0L)); TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(rpcService, "dummyRpcEndpoint1", expectedValue); DummyRpcEndpoint dummyRpcEndpoint2 = new DummyRpcEndpoint(rpcService, "dummyRpcEndpoint2", expectedValue2); @@ -115,7 +117,7 @@ public void testRpcGatewayRetrieval() throws Exception { /** * Testing RpcGateway. */ - public interface DummyGateway extends RpcGateway { + public interface DummyGateway extends FencedRpcGateway { CompletableFuture foobar(@RpcTimeout Time timeout); } @@ -132,5 +134,10 @@ protected DummyRpcEndpoint(RpcService rpcService, String endpointId, String valu public CompletableFuture foobar(Time timeout) { return CompletableFuture.completedFuture(value); } + + @Override + public UUID getFencingToken() { + return HighAvailabilityServices.DEFAULT_LEADER_ID; + } } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 913090136925d..6846ba69ab520 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -366,7 +366,7 @@ protected int runApplicationMaster(Configuration config) { webMonitor = BootstrapTools.startWebMonitorIfConfigured( config, highAvailabilityServices, - new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout), + new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), webMonitorTimeout, futureExecutor,