Skip to content

Commit

Permalink
[hotfix] Let delayed AdaptiveScheduler.runIfState method return Sched…
Browse files Browse the repository at this point in the history
…uledFuture

By letting the AdaptiveScheduler.runIfState method return a ScheduledFuture it is now
possible to cancel scheduled actions.
  • Loading branch information
tillrohrmann committed Mar 20, 2021
1 parent c4545e0 commit 53ac701
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -962,8 +963,8 @@ public void runIfState(State expectedState, Runnable action) {
}

@Override
public void runIfState(State expectedState, Runnable action, Duration delay) {
componentMainThreadExecutor.schedule(
public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) {
return componentMainThreadExecutor.schedule(
() -> runIfState(expectedState, action), delay.toMillis(), TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;

/**
* State which waits for the creation of the {@link ExecutionGraph}. If the creation fails, then the
Expand Down Expand Up @@ -149,8 +150,9 @@ ArchivedExecutionGraph getArchivedExecutionGraph(
* the action
* @param action action to run if the expected state equals the actual state
* @param delay delay after which to run the action
* @return a ScheduledFuture representing pending completion of the task
*/
void runIfState(State expectedState, Runnable action, Duration delay);
ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay);

/**
* Try to assign slots to the created {@link ExecutionGraph}. If it is possible, then this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.Logger;

import java.time.Duration;
import java.util.concurrent.ScheduledFuture;

/** State which describes a job which is currently being restarted. */
class Restarting extends StateWithExecutionGraph {
Expand Down Expand Up @@ -105,8 +106,9 @@ void goToCanceling(
* the delay
* @param action action to run if the state equals the expected state
* @param delay delay after which the action should be executed
* @return a ScheduledFuture representing pending completion of the task
*/
void runIfState(State expectedState, Runnable action, Duration delay);
ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay);
}

static class Factory implements StateFactory<Restarting> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.annotation.Nullable;

import java.time.Duration;
import java.util.concurrent.ScheduledFuture;

/**
* State which describes that the scheduler is waiting for resources in order to execute the job.
Expand Down Expand Up @@ -142,8 +143,9 @@ ArchivedExecutionGraph getArchivedExecutionGraph(
* the action
* @param action action to run if the expected state equals the actual state
* @param delay delay after which to run the action
* @return a ScheduledFuture representing pending completion of the task
*/
void runIfState(State expectedState, Runnable action, Duration delay);
ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay);
}

static class Factory implements StateFactory<WaitingForResources> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.CompletedScheduledFuture;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.FlinkException;
Expand All @@ -31,6 +32,7 @@

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -187,10 +189,12 @@ public ArchivedExecutionGraph getArchivedExecutionGraph(
}

@Override
public void runIfState(State expectedState, Runnable action, Duration delay) {
public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) {
if (!hadStateTransitionHappened) {
action.run();
}

return CompletedScheduledFuture.create(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.CompletedScheduledFuture;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
Expand All @@ -29,6 +30,7 @@
import org.junit.Test;

import java.time.Duration;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;

import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
Expand Down Expand Up @@ -172,10 +174,11 @@ public void goToWaitingForResources() {
}

@Override
public void runIfState(State expectedState, Runnable action, Duration delay) {
public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) {
if (!hadStateTransition) {
action.run();
}
return CompletedScheduledFuture.create(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
Expand All @@ -33,12 +34,12 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand All @@ -57,12 +58,7 @@ public void testTransitionToCreatingExecutionGraph() throws Exception {

new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO);
// run delayed actions
for (ScheduledRunnable scheduledRunnable : ctx.getScheduledRunnables()) {
scheduledRunnable.runAction();
if (ctx.hasStateTransition()) {
break;
}
}
ctx.runScheduledTasks();
}
}

Expand Down Expand Up @@ -100,12 +96,7 @@ public void testResourceTimeout() throws Exception {
ctx.setExpectCreatingExecutionGraph();

// immediately execute all scheduled runnables
assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
for (ScheduledRunnable scheduledRunnable : ctx.getScheduledRunnables()) {
if (scheduledRunnable.getExpectedState() == wfr) {
scheduledRunnable.runAction();
}
}
ctx.runScheduledTasks();
}
}

Expand Down Expand Up @@ -172,13 +163,9 @@ private static class MockContext implements WaitingForResources.Context, AutoClo
new StateValidator<>("finished");

private Supplier<Boolean> hasEnoughResourcesSupplier = () -> false;
private final List<ScheduledRunnable> scheduledRunnables = new ArrayList<>();
private final List<ScheduledTask<Void>> scheduledTasks = new ArrayList<>();
private boolean hasStateTransition = false;

public List<ScheduledRunnable> getScheduledRunnables() {
return scheduledRunnables;
}

public void setHasEnoughResources(Supplier<Boolean> sup) {
hasEnoughResourcesSupplier = sup;
}
Expand All @@ -191,6 +178,12 @@ void setExpectCreatingExecutionGraph() {
creatingExecutionGraphStateValidator.expectInput(none -> {});
}

void runScheduledTasks() {
for (ScheduledTask<Void> scheduledTask : scheduledTasks) {
scheduledTask.execute();
}
}

@Override
public void close() throws Exception {
creatingExecutionGraphStateValidator.close();
Expand All @@ -212,8 +205,21 @@ public boolean hasEnoughResources(ResourceCounter desiredResources) {
}

@Override
public void runIfState(State expectedState, Runnable action, Duration delay) {
scheduledRunnables.add(new ScheduledRunnable(expectedState, action, delay));
public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) {
final ScheduledTask<Void> scheduledTask =
new ScheduledTask<>(
() -> {
if (!hasStateTransition) {
action.run();
}

return null;
},
delay.toMillis());

scheduledTasks.add(scheduledTask);

return scheduledTask;
}

@Override
Expand All @@ -233,26 +239,6 @@ public boolean hasStateTransition() {
}
}

private static final class ScheduledRunnable {
private final Runnable action;
private final State expectedState;
private final Duration delay;

private ScheduledRunnable(State expectedState, Runnable action, Duration delay) {
this.expectedState = expectedState;
this.action = action;
this.delay = delay;
}

public void runAction() {
action.run();
}

public State getExpectedState() {
return expectedState;
}
}

static <T> Consumer<T> assertNonNull() {
return (item) -> assertThat(item, notNullValue());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.testutils;

import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Completed {@link ScheduledFuture} implementation.
*
* @param <T> type of the {@link ScheduledFuture}
*/
public final class CompletedScheduledFuture<T> implements ScheduledFuture<T> {

private final T value;

private CompletedScheduledFuture(T value) {
this.value = value;
}

@Override
public long getDelay(TimeUnit unit) {
return 0;
}

@Override
public int compareTo(Delayed o) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return true;
}

@Override
public T get() throws InterruptedException, ExecutionException {
return value;
}

@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return value;
}

public static <T> CompletedScheduledFuture<T> create(T value) {
return new CompletedScheduledFuture<>(value);
}
}
Loading

0 comments on commit 53ac701

Please sign in to comment.