Skip to content

Commit

Permalink
[FLINK-21255] Add tests for WaitingForResources state
Browse files Browse the repository at this point in the history
This closes apache#14852
  • Loading branch information
rmetzger committed Feb 4, 2021
1 parent f299328 commit 2a1c7cb
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ interface State {
/** This method is called whenever one transitions into this state. */
default void onEnter() {}

/**
* This method is called whenever one transitions out of this state.
*
* @param newState newState is the state into which the scheduler transitions
*/
default void onLeave(State newState) {}

/** Cancels the job execution. */
void cancel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;

Expand All @@ -36,19 +37,28 @@ class WaitingForResources implements State, ResourceConsumer {

private final Context context;

private final Logger logger;
private final Logger log;

private final ResourceCounter desiredResources;

WaitingForResources(Context context, Logger logger, ResourceCounter desiredResources) {
this.context = context;
this.logger = logger;
this.desiredResources = desiredResources;
private final Duration resourceTimeout;

WaitingForResources(
Context context,
Logger log,
ResourceCounter desiredResources,
Duration resourceTimeout) {
this.context = Preconditions.checkNotNull(context);
this.log = Preconditions.checkNotNull(log);
this.desiredResources = Preconditions.checkNotNull(desiredResources);
this.resourceTimeout = Preconditions.checkNotNull(resourceTimeout);
Preconditions.checkArgument(
!desiredResources.isEmpty(), "Desired resources must not be empty");
}

@Override
public void onEnter() {
context.runIfState(this, this::resourceTimeout, Duration.ofSeconds(10L));
context.runIfState(this, this::resourceTimeout, resourceTimeout);
notifyNewResourcesAvailable();
}

Expand All @@ -74,12 +84,12 @@ public ArchivedExecutionGraph getJob() {

@Override
public void handleGlobalFailure(Throwable cause) {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.INITIALIZING, cause));
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause));
}

@Override
public Logger getLogger() {
return logger;
return log;
}

@Override
Expand All @@ -100,7 +110,6 @@ private void createExecutionGraphWithAvailableResources() {

context.goToExecuting(executionGraph);
} catch (Exception exception) {
logger.error("handling initialization failure", exception);
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, exception));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.declarative;

import org.apache.flink.util.Preconditions;

import java.util.function.Consumer;

import static org.junit.Assert.fail;

/**
* Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to track if correct input
* has been presented and if the state transition happened.
*
* @param <T> Type of the state to validate.
*/
public class StateValidator<T> {

private Runnable trap = () -> {};
private Consumer<T> consumer;
private final String stateName;

public StateValidator(String stateName) {
this.stateName = stateName;
expectNoStateTransition();
}

/**
* Expect an input, and validate it with the given asserter (if the state transition hasn't been
* validated, it will fail in the close method).
*
* @param asserter Consumer which validates the input to the state transition.
*/
public void expectInput(Consumer<T> asserter) {
consumer = Preconditions.checkNotNull(asserter);
trap =
() -> {
throw new AssertionError("No transition to " + stateName);
};
}

/**
* Call this method on the state transition, to register the transition, and validate the passed
* arguments.
*
* @param input Argument(s) of the state transition.
* @throws NullPointerException If no consumer has been set (an unexpected state transition
* occurred)
*/
public void validateInput(T input) {
trap = () -> {};
consumer.accept(input);
expectNoStateTransition();
}

/**
* If the validator has been activated, check if input has been provided (e.g. a state
* transition happened).
*/
public void close() {
trap.run();
}

public final void expectNoStateTransition() {
consumer = (T) -> fail("No consumer has been set. Unexpected state transition");
}
}
Loading

0 comments on commit 2a1c7cb

Please sign in to comment.