diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index 67e49fa28e31f..7475071e23e6e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -25,11 +25,17 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.util.FlinkException; @@ -43,6 +49,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; /** * Client to interact with a {@link MiniCluster}. @@ -50,6 +59,8 @@ public class MiniClusterClient extends ClusterClient { private final MiniCluster miniCluster; + private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, new ExecutorThreadFactory("Flink-MiniClusterClient")); + private final ScheduledExecutor scheduledExecutor = new ScheduledExecutorServiceAdapter(scheduledExecutorService); public MiniClusterClient(@Nonnull Configuration configuration, @Nonnull MiniCluster miniCluster) throws Exception { super(configuration, miniCluster.getHighAvailabilityServices(), true); @@ -57,6 +68,12 @@ public MiniClusterClient(@Nonnull Configuration configuration, @Nonnull MiniClus this.miniCluster = miniCluster; } + @Override + public void shutdown() throws Exception { + super.shutdown(); + scheduledExecutorService.shutdown(); + } + @Override public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { @@ -82,12 +99,12 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) @Override public void cancel(JobID jobId) throws Exception { - miniCluster.cancelJob(jobId); + guardWithSingleRetry(() -> miniCluster.cancelJob(jobId), scheduledExecutor); } @Override public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception { - return miniCluster.triggerSavepoint(jobId, savepointDirectory, true).get(); + return guardWithSingleRetry(() -> miniCluster.triggerSavepoint(jobId, savepointDirectory, true), scheduledExecutor).get(); } @Override @@ -122,7 +139,7 @@ public Map getAccumulators(JobID jobID, ClassLoader loader) thro @Override public CompletableFuture getJobStatus(JobID jobId) { - return miniCluster.getJobStatus(jobId); + return guardWithSingleRetry(() -> miniCluster.getJobStatus(jobId), scheduledExecutor); } @Override @@ -174,4 +191,13 @@ public boolean hasUserJarsInClassPath(List userJarFiles) { enum MiniClusterId { INSTANCE } + + private static CompletableFuture guardWithSingleRetry(Supplier> operation, ScheduledExecutor executor) { + return FutureUtils.retryWithDelay( + operation, + 1, + Time.milliseconds(500), + throwable -> throwable instanceof FencingTokenException || throwable instanceof AkkaRpcException, + executor); + } }