Skip to content

Commit

Permalink
[FLINK-24355][runtime] Expose the flag for enabling checkpoints after…
Browse files Browse the repository at this point in the history
… tasks finish in the Web UI
  • Loading branch information
SteNicholas authored and dawidwys committed Oct 19, 2021
1 parent d8112a7 commit 9115192
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 5 deletions.
3 changes: 3 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -2237,6 +2237,9 @@
},
"unaligned_checkpoints" : {
"type" : "boolean"
},
"checkpoints_after_tasks_finish" : {
"type" : "boolean"
}
}
} </code>
Expand Down
3 changes: 3 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,9 @@
},
"aligned_checkpoint_timeout" : {
"type" : "integer"
},
"checkpoints_after_tasks_finish" : {
"type" : "boolean"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ export interface CheckPointConfigInterface {
unaligned_checkpoints: boolean;
tolerable_failed_checkpoints: number;
aligned_checkpoint_timeout: number;
checkpoints_after_tasks_finish: boolean;
}

export interface CheckPointDetailInterface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@
<td>Tolerable Failed Checkpoints</td>
<td>{{ checkPointConfig['tolerable_failed_checkpoints'] }}</td>
</tr>
<tr>
<td>Checkpoints With Finished Tasks</td>
<td>
{{ checkPointConfig['checkpoints_after_tasks_finish'] ? 'Enabled' : 'Disabled' }}
</td>
</tr>
</ng-container>
</tbody>
</nz-table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ private static CheckpointConfigInfo createCheckpointConfigInfo(
checkpointStorageName,
checkpointCoordinatorConfiguration.isUnalignedCheckpointsEnabled(),
checkpointCoordinatorConfiguration.getTolerableCheckpointFailureNumber(),
checkpointCoordinatorConfiguration.getAlignedCheckpointTimeout());
checkpointCoordinatorConfiguration.getAlignedCheckpointTimeout(),
checkpointCoordinatorConfiguration.isEnableCheckpointsAfterTasksFinish());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public class CheckpointConfigInfo implements ResponseBody {

public static final String FIELD_NAME_ALIGNED_CHECKPOINT_TIMEOUT = "aligned_checkpoint_timeout";

public static final String FIELD_NAME_CHECKPOINTS_AFTER_TASKS_FINISH =
"checkpoints_after_tasks_finish";

@JsonProperty(FIELD_NAME_PROCESSING_MODE)
private final ProcessingMode processingMode;

Expand Down Expand Up @@ -95,6 +98,9 @@ public class CheckpointConfigInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_ALIGNED_CHECKPOINT_TIMEOUT)
private final long alignedCheckpointTimeout;

@JsonProperty(FIELD_NAME_CHECKPOINTS_AFTER_TASKS_FINISH)
private final boolean checkpointsWithFinishedTasks;

@JsonCreator
public CheckpointConfigInfo(
@JsonProperty(FIELD_NAME_PROCESSING_MODE) ProcessingMode processingMode,
Expand All @@ -108,7 +114,9 @@ public CheckpointConfigInfo(
@JsonProperty(FIELD_NAME_CHECKPOINT_STORAGE) String checkpointStorage,
@JsonProperty(FIELD_NAME_UNALIGNED_CHECKPOINTS) boolean unalignedCheckpoints,
@JsonProperty(FIELD_NAME_TOLERABLE_FAILED_CHECKPOINTS) int tolerableFailedCheckpoints,
@JsonProperty(FIELD_NAME_ALIGNED_CHECKPOINT_TIMEOUT) long alignedCheckpointTimeout) {
@JsonProperty(FIELD_NAME_ALIGNED_CHECKPOINT_TIMEOUT) long alignedCheckpointTimeout,
@JsonProperty(FIELD_NAME_CHECKPOINTS_AFTER_TASKS_FINISH)
boolean checkpointsWithFinishedTasks) {
this.processingMode = Preconditions.checkNotNull(processingMode);
this.checkpointInterval = checkpointInterval;
this.checkpointTimeout = checkpointTimeout;
Expand All @@ -120,6 +128,7 @@ public CheckpointConfigInfo(
this.unalignedCheckpoints = unalignedCheckpoints;
this.tolerableFailedCheckpoints = tolerableFailedCheckpoints;
this.alignedCheckpointTimeout = alignedCheckpointTimeout;
this.checkpointsWithFinishedTasks = checkpointsWithFinishedTasks;
}

@Override
Expand All @@ -141,7 +150,8 @@ public boolean equals(Object o) {
&& Objects.equals(checkpointStorage, that.checkpointStorage)
&& unalignedCheckpoints == that.unalignedCheckpoints
&& tolerableFailedCheckpoints == that.tolerableFailedCheckpoints
&& alignedCheckpointTimeout == that.alignedCheckpointTimeout;
&& alignedCheckpointTimeout == that.alignedCheckpointTimeout
&& checkpointsWithFinishedTasks == that.checkpointsWithFinishedTasks;
}

@Override
Expand All @@ -157,7 +167,8 @@ public int hashCode() {
checkpointStorage,
unalignedCheckpoints,
tolerableFailedCheckpoints,
alignedCheckpointTimeout);
alignedCheckpointTimeout,
checkpointsWithFinishedTasks);
}

/** Contains information about the externalized checkpoint configuration. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ protected CheckpointConfigInfo getTestResponseInstance() {
"checkpointStorageName",
true,
3,
4);
4,
true);
}
}

0 comments on commit 9115192

Please sign in to comment.