diff --git a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java index 8b0cd3310969d..28a21354e5d27 100644 --- a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java +++ b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java @@ -140,7 +140,7 @@ private static void checkTaskManagerMetricAvailability(final RestClient restClie } private static X fetchMetric(final SupplierWithException, IOException> clientOperation, final Predicate predicate) throws InterruptedException, ExecutionException, TimeoutException { - final CompletableFuture responseFuture = FutureUtils.retrySuccesfulWithDelay(() -> { + final CompletableFuture responseFuture = FutureUtils.retrySuccessfulWithDelay(() -> { try { return clientOperation.get(); } catch (IOException e) { diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 915c02b644d8f..036796c582d45 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -107,7 +107,7 @@ public void testJobManagerJMXMetricAccess() throws Exception { client.setDetached(true); client.submitJob(jobGraph, JMXJobManagerMetricTest.class.getClassLoader()); - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> client.getJobStatus(jobGraph.getJobID()), Time.milliseconds(10), deadline, diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index 83976f16094a4..eb6b3ef8c50e8 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -1273,7 +1273,7 @@ public void close() throws Exception { // Free cluster resources clusterClient.cancel(jobId); // cancel() is non-blocking so do this to make sure the job finished - CompletableFuture jobStatusFuture = FutureUtils.retrySuccesfulWithDelay( + CompletableFuture jobStatusFuture = FutureUtils.retrySuccessfulWithDelay( () -> clusterClient.getJobStatus(jobId), Time.milliseconds(50), deadline, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 7392030496417..0f36a3af141db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -239,7 +239,7 @@ private static void retryOperationWithDelay( * @return Future which retries the given operation a given amount of times and delays the retry * in case the predicate isn't matched */ - public static CompletableFuture retrySuccesfulWithDelay( + public static CompletableFuture retrySuccessfulWithDelay( final Supplier> operation, final Time retryDelay, final Deadline deadline, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java index 14aecc700ad2b..ddae8462539ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -109,7 +109,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { // Submit job and wait until running flink.runDetached(jobGraph); - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> flink.getJobStatus(jobGraph.getJobID()), Time.milliseconds(10), deadline, @@ -160,7 +160,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); // wait until the job is canceled - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> flink.getJobStatus(jobGraph.getJobID()), Time.milliseconds(10), deadline, diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 0ead861adbaf7..d4063184b62d4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -153,7 +153,7 @@ public void go() throws Exception { try { NotifyingMapper.notifyLatch.await(); - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> { try { return CompletableFuture.completedFuture(client.getAccumulators(jobGraph.getJobID())); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index e7067086ce1ba..9140570a89009 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -240,7 +240,7 @@ private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceF client.cancel(jobId); - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> client.getJobStatus(jobId), Time.milliseconds(50), Deadline.now().plus(Duration.ofSeconds(30)), diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java index 7c00de7f43684..9bec331608ea6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java @@ -258,7 +258,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO // now the job should be able to go to RUNNING again and then eventually to FINISHED, // which it only does if it could successfully restore - CompletableFuture jobStatusFuture = FutureUtils.retrySuccesfulWithDelay( + CompletableFuture jobStatusFuture = FutureUtils.retrySuccessfulWithDelay( () -> clusterClient.getJobStatus(jobID), Time.milliseconds(50), deadline, diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java index ec3f1e1b9d3fa..e83f9ab8c583f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java @@ -393,7 +393,7 @@ public void run() { } private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException { - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())), Time.milliseconds(50L), org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())), diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 90ea8796ca57c..ed987d677d8bb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -294,7 +294,7 @@ static DispatcherGateway retrieveDispatcherGateway(RpcService rpcService, HighAv } private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException { - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> dispatcherGateway.requestClusterOverview(timeout), Time.milliseconds(50L), Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())), @@ -306,7 +306,7 @@ private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time } private Collection waitForRunningJobs(ClusterClient clusterClient, Time timeout) throws ExecutionException, InterruptedException { - return FutureUtils.retrySuccesfulWithDelay( + return FutureUtils.retrySuccessfulWithDelay( CheckedSupplier.unchecked(clusterClient::listJobs), Time.milliseconds(50L), Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())), diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index 097616feb9663..0b79af513f6bb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -118,7 +118,7 @@ private String migrateJob(ClassLoader classLoader, ClusterClient clusterClien clusterClient.submitJob(jobToMigrate, classLoader); - CompletableFuture jobRunningFuture = FutureUtils.retrySuccesfulWithDelay( + CompletableFuture jobRunningFuture = FutureUtils.retrySuccessfulWithDelay( () -> clusterClient.getJobStatus(jobToMigrate.getJobID()), Time.milliseconds(50), deadline, @@ -152,7 +152,7 @@ private String migrateJob(ClassLoader classLoader, ClusterClient clusterClien assertNotNull("Could not take savepoint.", savepointPath); - CompletableFuture jobCanceledFuture = FutureUtils.retrySuccesfulWithDelay( + CompletableFuture jobCanceledFuture = FutureUtils.retrySuccessfulWithDelay( () -> clusterClient.getJobStatus(jobToMigrate.getJobID()), Time.milliseconds(50), deadline, @@ -173,7 +173,7 @@ private void restoreJob(ClassLoader classLoader, ClusterClient clusterClient, clusterClient.submitJob(jobToRestore, classLoader); - CompletableFuture jobStatusFuture = FutureUtils.retrySuccesfulWithDelay( + CompletableFuture jobStatusFuture = FutureUtils.retrySuccessfulWithDelay( () -> clusterClient.getJobStatus(jobToRestore.getJobID()), Time.milliseconds(50), deadline,