-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8900] [yarn] Set correct application status when job is finished #5944
Conversation
Brings the logging definition in sync with other projects. Updates the classname for the suppressed logger in Netty to account for the new shading model introduced in Flink 1.4.
The test failure is an unrelated test flakeyness . |
I will try it out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes work on EMR.
Running
HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster -d examples/streaming/WordCount.jar --input doesnotexist
yields
State | FinalStatus |
---|---|
FINISHED | FAILED |
in the YARN ResourceManager UI.
@@ -109,7 +119,11 @@ public MiniDispatcher( | |||
|
|||
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { | |||
// terminate the MiniDispatcher once we served the first JobResult successfully | |||
jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown()); | |||
jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throwable
isn't used. If jobResultFuture
cannot be completed exceptionally, thenAccept
should be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update
@@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc | |||
} | |||
} | |||
|
|||
@Override | |||
protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) { | |||
terminationFuture.whenComplete((status, throwable) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throwable
isn't used. If terminationFuture
cannot be completed exceptionally, thenAccept
should be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update
jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> { | ||
ApplicationStatus status = result.getSerializedThrowable().isPresent() ? | ||
ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; | ||
jobTerminationFuture.complete(status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
I think the functional way would be:
jobTerminationFuture.complete(result.getSerializedThrowable()
.map(serializedThrowable -> ApplicationStatus.FAILED)
.orElse(ApplicationStatus.SUCCEEDED));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I had it like that initially, but found the above version more readable in the end, because we don't really use the serializedThrowable (making the map() a bit strange).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
What is the purpose of the change
When finite Flink applications (batch jobs) are sent to YARN in the detached mode, the final status is currently always the same, because the job's result is not passed to the logic that initiates the application shutdown.
This PR forwards the final job status via a future that is used to register the shutdown handlers.
Brief change log
JobTerminationFuture
in theMiniDispatcher
Verifying this change
Run the batch job as described above on YARN to succeed, check that the final application status is successful.
Run the batch job with a parameter to a non existing input file on YARN, check that the final application status is failed.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation