Skip to content

Commit

Permalink
[FLINK-14820][core] Remove unused scheduleWithDelay
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 28, 2022
1 parent 349ae05 commit f25f9b8
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,60 +249,6 @@ public static <T> CompletableFuture<T> retryWithDelay(
return retryWithDelay(operation, retryStrategy, (throwable) -> true, scheduledExecutor);
}

/**
* Schedule the operation with the given delay.
*
* @param operation to schedule
* @param delay delay to schedule
* @param scheduledExecutor executor to be used for the operation
* @return Future which schedules the given operation with given delay.
*/
public static CompletableFuture<Void> scheduleWithDelay(
final Runnable operation, final Time delay, final ScheduledExecutor scheduledExecutor) {
Supplier<Void> operationSupplier =
() -> {
operation.run();
return null;
};
return scheduleWithDelay(operationSupplier, delay, scheduledExecutor);
}

/**
* Schedule the operation with the given delay.
*
* @param operation to schedule
* @param delay delay to schedule
* @param scheduledExecutor executor to be used for the operation
* @param <T> type of the result
* @return Future which schedules the given operation with given delay.
*/
public static <T> CompletableFuture<T> scheduleWithDelay(
final Supplier<T> operation,
final Time delay,
final ScheduledExecutor scheduledExecutor) {
final CompletableFuture<T> resultFuture = new CompletableFuture<>();

ScheduledFuture<?> scheduledFuture =
scheduledExecutor.schedule(
() -> {
try {
resultFuture.complete(operation.get());
} catch (Throwable t) {
resultFuture.completeExceptionally(t);
}
},
delay.getSize(),
delay.getUnit());

resultFuture.whenComplete(
(t, throwable) -> {
if (!scheduledFuture.isDone()) {
scheduledFuture.cancel(false);
}
});
return resultFuture;
}

private static <T> void retryOperationWithDelay(
final CompletableFuture<T> resultFuture,
final Supplier<CompletableFuture<T>> operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,58 +290,6 @@ public void testRetryWithDelayRetryStrategyCancellation() {
assertTrue(scheduledFuture.isCancelled());
}

/** Tests that the operation could be scheduled with expected delay. */
@Test
public void testScheduleWithDelay() throws Exception {
final ManuallyTriggeredScheduledExecutor scheduledExecutor =
new ManuallyTriggeredScheduledExecutor();

final int expectedResult = 42;
CompletableFuture<Integer> completableFuture =
FutureUtils.scheduleWithDelay(
() -> expectedResult, Time.milliseconds(0), scheduledExecutor);

scheduledExecutor.triggerScheduledTasks();
final int actualResult = completableFuture.get();

assertEquals(expectedResult, actualResult);
}

/** Tests that a scheduled task is canceled if the scheduled future is being cancelled. */
@Test
public void testScheduleWithDelayCancellation() {
final ManuallyTriggeredScheduledExecutor scheduledExecutor =
new ManuallyTriggeredScheduledExecutor();

final Runnable noOpRunnable = () -> {};
CompletableFuture<Void> completableFuture =
FutureUtils.scheduleWithDelay(
noOpRunnable, TestingUtils.infiniteTime(), scheduledExecutor);

final ScheduledFuture<?> scheduledFuture =
scheduledExecutor.getActiveScheduledTasks().iterator().next();

completableFuture.cancel(false);

assertTrue(completableFuture.isCancelled());
assertTrue(scheduledFuture.isCancelled());
}

/** Tests that the operation is never scheduled if the delay is virtually infinite. */
@Test
public void testScheduleWithInfiniteDelayNeverSchedulesOperation() {
final Runnable noOpRunnable = () -> {};
final CompletableFuture<Void> completableFuture =
FutureUtils.scheduleWithDelay(
noOpRunnable,
TestingUtils.infiniteTime(),
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));

assertFalse(completableFuture.isDone());

completableFuture.cancel(false);
}

/** Tests that a future is timed out after the specified timeout. */
@Test
public void testOrTimeout() throws Exception {
Expand Down

0 comments on commit f25f9b8

Please sign in to comment.