Skip to content

Commit

Permalink
[FLINK-5399] [checkpoints] Add more information about checkpoint to T…
Browse files Browse the repository at this point in the history
…riggerSavepointSuccess

This closes apache#3051
  • Loading branch information
shijinkui authored and StephanEwen committed Jan 5, 2017
1 parent 9f7ad84 commit d156f8d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testTriggerSavepointSuccess() throws Exception {

String savepointPath = "expectedSavepointPath";

triggerResponse.success(new TriggerSavepointSuccess(jobId, savepointPath));
triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));

CliFrontend frontend = new MockCliFrontend(
CliFrontendTestUtils.getConfigDir(), jobManager);
Expand Down Expand Up @@ -214,7 +214,7 @@ public void testTriggerSavepointCustomTarget() throws Exception {
any(FiniteDuration.class)))
.thenReturn(triggerResponse.future());
String savepointPath = "expectedSavepointPath";
triggerResponse.success(new TriggerSavepointSuccess(jobId, savepointPath));
triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));

CliFrontend frontend = new MockCliFrontend(
CliFrontendTestUtils.getConfigDir(), jobManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,12 @@ class JobManager(
override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
if (success != null) {
if (success.getExternalPath != null) {
senderRef ! TriggerSavepointSuccess(jobId, success.getExternalPath)
senderRef ! TriggerSavepointSuccess(
jobId,
success.getCheckpointID,
success.getExternalPath,
success.getTimestamp
)
} else {
senderRef ! TriggerSavepointFailure(
jobId, new Exception("Savepoint has not been persisted."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,12 @@ object JobManagerMessages {
* @param jobId The job ID for which the savepoint was triggered.
* @param savepointPath The path of the savepoint.
*/
case class TriggerSavepointSuccess(jobId: JobID, savepointPath: String)
case class TriggerSavepointSuccess(
jobId: JobID,
checkpointId: Long,
savepointPath: String,
triggerTime: Long
)

/**
* Response after a failed savepoint trigger containing the failure cause.
Expand Down

0 comments on commit d156f8d

Please sign in to comment.