Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8900] [yarn] Set correct application status when job is finished #5944

Closed
wants to merge 2 commits into from

Conversation

StephanEwen
Copy link
Contributor

What is the purpose of the change

When finite Flink applications (batch jobs) are sent to YARN in the detached mode, the final status is currently always the same, because the job's result is not passed to the logic that initiates the application shutdown.

This PR forwards the final job status via a future that is used to register the shutdown handlers.

Brief change log

  • Introduce the JobTerminationFuture in the MiniDispatcher

Verifying this change

bin/flink run -m yarn-cluster -yjm 2048 -ytm 2048  ./examples/streaming/WordCount.jar
  • Run the batch job as described above on YARN to succeed, check that the final application status is successful.

  • Run the batch job with a parameter to a non existing input file on YARN, check that the final application status is failed.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Brings the logging definition in sync with other projects.
Updates the classname for the suppressed logger in Netty to account for the new
shading model introduced in Flink 1.4.
@StephanEwen
Copy link
Contributor Author

StephanEwen commented Apr 30, 2018

The test failure is an unrelated test flakeyness .

@GJL
Copy link
Member

GJL commented Apr 30, 2018

I will try it out.

Copy link
Member

@GJL GJL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes work on EMR.

Running

HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster -d examples/streaming/WordCount.jar --input doesnotexist

yields

State FinalStatus
FINISHED FAILED

in the YARN ResourceManager UI.

@@ -109,7 +119,11 @@ public MiniDispatcher(

if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the first JobResult successfully
jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown());
jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throwable isn't used. If jobResultFuture cannot be completed exceptionally, thenAccept should be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update

@@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc
}
}

@Override
protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
terminationFuture.whenComplete((status, throwable) ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throwable isn't used. If terminationFuture cannot be completed exceptionally, thenAccept should be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update

jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> {
ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
jobTerminationFuture.complete(status);
Copy link
Member

@GJL GJL May 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

I think the functional way would be:

				jobTerminationFuture.complete(result.getSerializedThrowable()
					.map(serializedThrowable -> ApplicationStatus.FAILED)
					.orElse(ApplicationStatus.SUCCEEDED));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I had it like that initially, but found the above version more readable in the end, because we don't really use the serializedThrowable (making the map() a bit strange).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

StephanEwen added a commit to StephanEwen/flink that referenced this pull request May 3, 2018
StephanEwen added a commit to StephanEwen/flink that referenced this pull request May 3, 2018
@asfgit asfgit closed this in 545d530 May 3, 2018
@StephanEwen StephanEwen deleted the yarn_fix branch July 12, 2018 10:38
sampathBhat pushed a commit to sampathBhat/flink that referenced this pull request Jul 26, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants