Skip to content

Commit

Permalink
[FLINK-8887][tests] Add single retry in MiniClusterClient
Browse files Browse the repository at this point in the history
This closes apache#5657.
  • Loading branch information
zentol committed Mar 9, 2018
1 parent 4dcc492 commit 94e959f
Showing 1 changed file with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.FlinkException;
Expand All @@ -43,20 +49,31 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

/**
* Client to interact with a {@link MiniCluster}.
*/
public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClusterId> {

private final MiniCluster miniCluster;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, new ExecutorThreadFactory("Flink-MiniClusterClient"));
private final ScheduledExecutor scheduledExecutor = new ScheduledExecutorServiceAdapter(scheduledExecutorService);

public MiniClusterClient(@Nonnull Configuration configuration, @Nonnull MiniCluster miniCluster) throws Exception {
super(configuration, miniCluster.getHighAvailabilityServices(), true);

this.miniCluster = miniCluster;
}

@Override
public void shutdown() throws Exception {
super.shutdown();
scheduledExecutorService.shutdown();
}

@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
if (isDetached()) {
Expand All @@ -82,12 +99,12 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)

@Override
public void cancel(JobID jobId) throws Exception {
miniCluster.cancelJob(jobId);
guardWithSingleRetry(() -> miniCluster.cancelJob(jobId), scheduledExecutor);
}

@Override
public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
return miniCluster.triggerSavepoint(jobId, savepointDirectory, true).get();
return guardWithSingleRetry(() -> miniCluster.triggerSavepoint(jobId, savepointDirectory, true), scheduledExecutor).get();
}

@Override
Expand Down Expand Up @@ -122,7 +139,7 @@ public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) thro

@Override
public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
return miniCluster.getJobStatus(jobId);
return guardWithSingleRetry(() -> miniCluster.getJobStatus(jobId), scheduledExecutor);
}

@Override
Expand Down Expand Up @@ -174,4 +191,13 @@ public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
enum MiniClusterId {
INSTANCE
}

private static <X> CompletableFuture<X> guardWithSingleRetry(Supplier<CompletableFuture<X>> operation, ScheduledExecutor executor) {
return FutureUtils.retryWithDelay(
operation,
1,
Time.milliseconds(500),
throwable -> throwable instanceof FencingTokenException || throwable instanceof AkkaRpcException,
executor);
}
}

0 comments on commit 94e959f

Please sign in to comment.