Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7960] [tests] Fix race conditions in ExecutionGraphRestartTest#completeCancellingForAllVertices #4933

Closed

Conversation

tillrohrmann
Copy link
Contributor

@tillrohrmann tillrohrmann commented Nov 1, 2017

What is the purpose of the change

One race condition is between waitUntilJobStatus(eg, JobStatus.FAILING, 1000) and the
subsequent completeCancellingForAllVertices where not all execution are in state
CANCELLING.

The other race condition is between completeCancellingForAllVertices and the fixed
delay restart without a delay. The problem is that the 10th task could have failed.
In order to restart we would have to complete the cancel for the first 9 tasks. This
is enough for the restart strategy to restart the job. If this happens before
completeCancellingForAllVertices has also cancelled the execution of the 10th task,
it could happen that we cancel a fresh execution.

R @GJL

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@tillrohrmann tillrohrmann force-pushed the hardenExecutionGraphRestartTest branch 2 times, most recently from 7ec9949 to ae15481 Compare November 1, 2017 16:00
…#completeCancellingForAllVertices

One race condition is between waitUntilJobStatus(eg, JobStatus.FAILING, 1000) and the
subsequent completeCancellingForAllVertices where not all execution are in state
CANCELLING.

The other race condition is between completeCancellingForAllVertices and the fixed
delay restart without a delay. The problem is that the 10th task could have failed.
In order to restart we would have to complete the cancel for the first 9 tasks. This
is enough for the restart strategy to restart the job. If this happens before
completeCancellingForAllVertices has also cancelled the execution of the 10th task,
it could happen that we cancel a fresh execution.

[hotfix] Make WaitForTasks using an AtomicInteger
@tillrohrmann tillrohrmann force-pushed the hardenExecutionGraphRestartTest branch from ae15481 to 2c146ca Compare November 1, 2017 16:01
@GJL
Copy link
Member

GJL commented Nov 1, 2017

On your branch ExecutionGraphRestartTest#testRestartWithEagerSchedulingAndSlotSharing is failing consistently

java.util.concurrent.TimeoutException
	at org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus(ExecutionGraphTestUtils.java:116)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testRestartWithEagerSchedulingAndSlotSharing(ExecutionGraphRestartTest.java:776)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)

Edit: Problem is fixed if you initialize the Optional field (see comment below)

@@ -48,6 +48,8 @@

private Optional<Consumer<ExecutionAttemptID>> optSubmitCondition;

private Optional<Consumer<ExecutionAttemptID>> optCancelCondition;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be always null initialized. Isn't that a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jup it is. Fixed it.

@@ -844,7 +844,7 @@ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
// failing in the meantime may happen and is no problem.
// anything else is a serious problem !!!
if (current != FAILED) {
String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
String message = String.format("Asynchronous race: Found %s in state %s after successful cancel call.", vertex.getTaskNameWithSubtaskIndex(), state);
LOG.error(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: slf4j's {} placeholders should be used.

Copy link
Contributor Author

@tillrohrmann tillrohrmann Nov 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case the message has been created deliberately, because we reuse the message in the line below. Moreover, the logging statement is error and thus, will be evaluated in almost all cases. What one could argue is whether normal string concatenation wouldn't be faster than String.format.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

@GJL
Copy link
Member

GJL commented Nov 1, 2017

While you are at it:

ExecutionGraphTestUtils#switchToRunning

public static void switchToRunning(ExecutionGraph eg) {
		// check that all execution are in state DEPLOYING
		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
			final Execution exec = ev.getCurrentExecutionAttempt();
			assert(exec.getState() == ExecutionState.DEPLOYING);
		}

		// switch executions to RUNNING
		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
			final Execution exec = ev.getCurrentExecutionAttempt();
			exec.switchToRunning();
		}
	}

could be improved to

public static void switchToRunning(ExecutionGraph eg) {
		// check that all execution are in state DEPLOYING
		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
			final Execution exec = ev.getCurrentExecutionAttempt();
			final ExecutionState executionState = exec.getState();
			assert executionState == ExecutionState.DEPLOYING 
				: "Expected executionState to be DEPLOYING, was: " + executionState;
		}

		// switch executions to RUNNING
		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
			final Execution exec = ev.getCurrentExecutionAttempt();
			exec.switchToRunning();
		}
	}

so that the failure reason is more obvious.

@tillrohrmann
Copy link
Contributor Author

Thanks for the review @GJL. I've addressed your comments.

@GJL
Copy link
Member

GJL commented Nov 2, 2017

LGTM 👍

Tests didn't fail after ~1000 local executions.

Copy link
Member

@GJL GJL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@tillrohrmann
Copy link
Contributor Author

Thanks a lot for your review @GJL. Merging this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants