Skip to content

Commit

Permalink
[hotfix][tests] Fix typo in FutureUtils#retrySuccessfulWithDelay
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 15, 2018
1 parent b384630 commit 8a101ce
Show file tree
Hide file tree
Showing 11 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private static void checkTaskManagerMetricAvailability(final RestClient restClie
}

private static <X> X fetchMetric(final SupplierWithException<CompletableFuture<X>, IOException> clientOperation, final Predicate<X> predicate) throws InterruptedException, ExecutionException, TimeoutException {
final CompletableFuture<X> responseFuture = FutureUtils.retrySuccesfulWithDelay(() -> {
final CompletableFuture<X> responseFuture = FutureUtils.retrySuccessfulWithDelay(() -> {
try {
return clientOperation.get();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testJobManagerJMXMetricAccess() throws Exception {
client.setDetached(true);
client.submitJob(jobGraph, JMXJobManagerMetricTest.class.getClassLoader());

FutureUtils.retrySuccesfulWithDelay(
FutureUtils.retrySuccessfulWithDelay(
() -> client.getJobStatus(jobGraph.getJobID()),
Time.milliseconds(10),
deadline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,7 @@ public void close() throws Exception {
// Free cluster resources
clusterClient.cancel(jobId);
// cancel() is non-blocking so do this to make sure the job finished
CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccessfulWithDelay(
() -> clusterClient.getJobStatus(jobId),
Time.milliseconds(50),
deadline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private static <T> void retryOperationWithDelay(
* @return Future which retries the given operation a given amount of times and delays the retry
* in case the predicate isn't matched
*/
public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
public static <T> CompletableFuture<T> retrySuccessfulWithDelay(
final Supplier<CompletableFuture<T>> operation,
final Time retryDelay,
final Deadline deadline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception {
// Submit job and wait until running
flink.runDetached(jobGraph);

FutureUtils.retrySuccesfulWithDelay(
FutureUtils.retrySuccessfulWithDelay(
() -> flink.getJobStatus(jobGraph.getJobID()),
Time.milliseconds(10),
deadline,
Expand Down Expand Up @@ -160,7 +160,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception {
.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);

// wait until the job is canceled
FutureUtils.retrySuccesfulWithDelay(
FutureUtils.retrySuccessfulWithDelay(
() -> flink.getJobStatus(jobGraph.getJobID()),
Time.milliseconds(10),
deadline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void go() throws Exception {
try {
NotifyingMapper.notifyLatch.await();

FutureUtils.retrySuccesfulWithDelay(
FutureUtils.retrySuccessfulWithDelay(
() -> {
try {
return CompletableFuture.completedFuture(client.getAccumulators(jobGraph.getJobID()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceF

client.cancel(jobId);

FutureUtils.retrySuccesfulWithDelay(
FutureUtils.retrySuccessfulWithDelay(
() -> client.getJobStatus(jobId),
Time.milliseconds(50),
Deadline.now().plus(Duration.ofSeconds(30)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO

// now the job should be able to go to RUNNING again and then eventually to FINISHED,
// which it only does if it could successfully restore
CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccessfulWithDelay(
() -> clusterClient.getJobStatus(jobID),
Time.milliseconds(50),
deadline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public void run() {
}

private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
FutureUtils.retrySuccesfulWithDelay(
FutureUtils.retrySuccessfulWithDelay(
() -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
Time.milliseconds(50L),
org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ static DispatcherGateway retrieveDispatcherGateway(RpcService rpcService, HighAv
}

private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException {
FutureUtils.retrySuccesfulWithDelay(
FutureUtils.retrySuccessfulWithDelay(
() -> dispatcherGateway.requestClusterOverview(timeout),
Time.milliseconds(50L),
Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
Expand All @@ -306,7 +306,7 @@ private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time
}

private Collection<JobID> waitForRunningJobs(ClusterClient<?> clusterClient, Time timeout) throws ExecutionException, InterruptedException {
return FutureUtils.retrySuccesfulWithDelay(
return FutureUtils.retrySuccessfulWithDelay(
CheckedSupplier.unchecked(clusterClient::listJobs),
Time.milliseconds(50L),
Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClien

clusterClient.submitJob(jobToMigrate, classLoader);

CompletableFuture<JobStatus> jobRunningFuture = FutureUtils.retrySuccesfulWithDelay(
CompletableFuture<JobStatus> jobRunningFuture = FutureUtils.retrySuccessfulWithDelay(
() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
Time.milliseconds(50),
deadline,
Expand Down Expand Up @@ -152,7 +152,7 @@ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClien

assertNotNull("Could not take savepoint.", savepointPath);

CompletableFuture<JobStatus> jobCanceledFuture = FutureUtils.retrySuccesfulWithDelay(
CompletableFuture<JobStatus> jobCanceledFuture = FutureUtils.retrySuccessfulWithDelay(
() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
Time.milliseconds(50),
deadline,
Expand All @@ -173,7 +173,7 @@ private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient,

clusterClient.submitJob(jobToRestore, classLoader);

CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccessfulWithDelay(
() -> clusterClient.getJobStatus(jobToRestore.getJobID()),
Time.milliseconds(50),
deadline,
Expand Down

0 comments on commit 8a101ce

Please sign in to comment.