Skip to content

Commit

Permalink
[FLINK-18851][runtime-web] Add checkpoint type to checkpoint history
Browse files Browse the repository at this point in the history
  • Loading branch information
gm7y8 authored and rmetzger committed Oct 29, 2020
1 parent 146269d commit b2a342c
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 4 deletions.
16 changes: 16 additions & 0 deletions docs/_includes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -1489,6 +1489,10 @@
"alignment_buffered" : {
"type" : "integer"
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"end_to_end_duration" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1544,6 +1548,10 @@
"alignment_buffered" : {
"type" : "integer"
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"discarded" : {
"type" : "boolean"
},
Expand Down Expand Up @@ -1633,6 +1641,10 @@
"alignment_buffered" : {
"type" : "integer"
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"end_to_end_duration" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1882,6 +1894,10 @@
"alignment_buffered" : {
"type" : "integer"
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"end_to_end_duration" : {
"type" : "integer"
},
Expand Down
16 changes: 16 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,10 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"tasks" : {
"type" : "object",
"additionalProperties" : {
Expand Down Expand Up @@ -1088,6 +1092,10 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"tasks" : {
"type" : "object",
"additionalProperties" : {
Expand Down Expand Up @@ -1166,6 +1174,10 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"tasks" : {
"type" : "object",
"additionalProperties" : {
Expand Down Expand Up @@ -1292,6 +1304,10 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"tasks" : {
"type" : "object",
"additionalProperties" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export interface CheckPointCompletedStatisticsInterface {
tasks: CheckPointTaskStatisticsInterface;
external_path: string;
discarded: boolean;
checkpoint_type: string;
}

export interface CheckPointTaskStatisticsInterface {
Expand All @@ -114,6 +115,7 @@ export interface CheckPointConfigInterface {
enabled: boolean;
delete_on_cancellation: boolean;
};
unaligned_checkpoints: boolean;
}

export interface CheckPointDetailInterface {
Expand All @@ -130,6 +132,7 @@ export interface CheckPointDetailInterface {
failure_message?: string;
num_subtasks: number;
num_acknowledged_subtasks: number;
checkpoint_type: string;
tasks: Array<{
[ taskId: string ]: {
id: number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
<strong>Path:</strong> {{checkPointDetail?.external_path || '-'}}
<nz-divider nzType="vertical"></nz-divider>
<strong>Discarded:</strong> {{checkPointDetail?.discarded || '-'}}
<nz-divider nzType="vertical"></nz-divider>
<strong>Checkpoint Type:</strong> {{checkPointType}}
<ng-container *ngIf="checkPointDetail?.failure_message">
<nz-divider nzType="vertical"></nz-divider>
<strong>Failure Message:</strong> {{checkPointDetail?.failure_message || '-' }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import {
CheckPointCompletedStatisticsInterface,
CheckPointDetailInterface,
JobDetailCorrectInterface,
VerticesItemInterface
VerticesItemInterface,
CheckPointConfigInterface
} from 'interfaces';
import { forkJoin } from 'rxjs';
import { first } from 'rxjs/operators';
import { JobService } from 'services';

Expand All @@ -35,6 +37,7 @@ import { JobService } from 'services';
export class JobCheckpointsDetailComponent implements OnInit {
innerCheckPoint: CheckPointCompletedStatisticsInterface;
jobDetail: JobDetailCorrectInterface;
checkPointType: string;

@Input()
set checkPoint(value) {
Expand All @@ -47,6 +50,7 @@ export class JobCheckpointsDetailComponent implements OnInit {
}

checkPointDetail: CheckPointDetailInterface;
checkPointConfig: CheckPointConfigInterface;
listOfVertex: VerticesItemInterface[] = [];
isLoading = true;

Expand All @@ -57,9 +61,26 @@ export class JobCheckpointsDetailComponent implements OnInit {
refresh() {
this.isLoading = true;
if (this.jobDetail && this.jobDetail.jid) {
this.jobService.loadCheckpointDetails(this.jobDetail.jid, this.checkPoint.id).subscribe(
data => {
this.checkPointDetail = data;
forkJoin([
this.jobService.loadCheckpointConfig(this.jobDetail.jid),
this.jobService.loadCheckpointDetails(this.jobDetail.jid, this.checkPoint.id)
]).subscribe(
([config, detail]) => {
this.checkPointConfig = config;
this.checkPointDetail = detail;
if (this.checkPointDetail.checkpoint_type === 'CHECKPOINT') {
if (this.checkPointConfig.unaligned_checkpoints) {
this.checkPointType = 'unaligned checkpoint';
} else {
this.checkPointType = 'aligned checkpoint';
}
} else if (this.checkPointDetail.checkpoint_type === 'SYNC_SAVEPOINT') {
this.checkPointType = 'savepoint on cancel';
} else if (this.checkPointDetail.checkpoint_type === 'SAVEPOINT') {
this.checkPointType = 'savepoint';
} else {
this.checkPointType = '-';
}
this.isLoading = false;
this.cdr.markForCheck();
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
Expand Down Expand Up @@ -87,6 +88,8 @@ public class CheckpointStatistics implements ResponseBody {

public static final String FIELD_NAME_TASKS = "tasks";

public static final String FIELD_NAME_CHECKPOINT_TYPE = "checkpoint_type";

@JsonProperty(FIELD_NAME_ID)
private final long id;

Expand Down Expand Up @@ -123,6 +126,9 @@ public class CheckpointStatistics implements ResponseBody {
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;

@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
private final CheckpointType checkpointType;

@JsonProperty(FIELD_NAME_TASKS)
@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
Expand All @@ -141,6 +147,7 @@ private CheckpointStatistics(
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType checkpointType,
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
this.id = id;
this.status = Preconditions.checkNotNull(status);
Expand All @@ -154,6 +161,7 @@ private CheckpointStatistics(
this.persistedData = persistedData;
this.numSubtasks = numSubtasks;
this.numAckSubtasks = numAckSubtasks;
this.checkpointType = Preconditions.checkNotNull(checkpointType);
this.checkpointStatisticsPerTask = Preconditions.checkNotNull(checkpointStatisticsPerTask);
}

Expand Down Expand Up @@ -193,6 +201,10 @@ public int getNumAckSubtasks() {
return numAckSubtasks;
}

public CheckpointType getCheckpointType() {
return checkpointType;
}

@Nullable
public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask() {
return checkpointStatisticsPerTask;
Expand All @@ -219,6 +231,7 @@ public boolean equals(Object o) {
numSubtasks == that.numSubtasks &&
numAckSubtasks == that.numAckSubtasks &&
status == that.status &&
Objects.equals(checkpointType, that.checkpointType) &&
Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask);
}

Expand All @@ -237,6 +250,7 @@ public int hashCode() {
persistedData,
numSubtasks,
numAckSubtasks,
checkpointType,
checkpointStatisticsPerTask);
}

Expand Down Expand Up @@ -289,6 +303,7 @@ public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpoi
completedCheckpointStats.getPersistedData(),
completedCheckpointStats.getNumberOfSubtasks(),
completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
completedCheckpointStats.getProperties().getCheckpointType(),
checkpointStatisticsPerTask,
completedCheckpointStats.getExternalPath(),
completedCheckpointStats.isDiscarded());
Expand All @@ -308,6 +323,7 @@ public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpoi
failedCheckpointStats.getPersistedData(),
failedCheckpointStats.getNumberOfSubtasks(),
failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
failedCheckpointStats.getProperties().getCheckpointType(),
checkpointStatisticsPerTask,
failedCheckpointStats.getFailureTimestamp(),
failedCheckpointStats.getFailureMessage());
Expand All @@ -327,6 +343,7 @@ public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpoi
pendingCheckpointStats.getPersistedData(),
pendingCheckpointStats.getNumberOfSubtasks(),
pendingCheckpointStats.getNumberOfAcknowledgedSubtasks(),
pendingCheckpointStats.getProperties().getCheckpointType(),
checkpointStatisticsPerTask
);
} else {
Expand Down Expand Up @@ -369,6 +386,7 @@ public CompletedCheckpointStatistics(
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType checkpointType,
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
Expand All @@ -385,6 +403,7 @@ public CompletedCheckpointStatistics(
persistedData,
numSubtasks,
numAckSubtasks,
checkpointType,
checkpointingStatisticsPerTask);

this.externalPath = externalPath;
Expand Down Expand Up @@ -452,6 +471,7 @@ public FailedCheckpointStatistics(
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType checkpointType,
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
Expand All @@ -468,6 +488,7 @@ public FailedCheckpointStatistics(
persistedData,
numSubtasks,
numAckSubtasks,
checkpointType,
checkpointingStatisticsPerTask);

this.failureTimestamp = failureTimestamp;
Expand Down Expand Up @@ -524,6 +545,7 @@ public PendingCheckpointStatistics(
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType checkpointType,
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask) {
super(
id,
Expand All @@ -538,6 +560,7 @@ public PendingCheckpointStatistics(
persistedData,
numSubtasks,
numAckSubtasks,
checkpointType,
checkpointingStatisticsPerTask);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.messages.checkpoints;

import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;

Expand Down Expand Up @@ -90,6 +91,7 @@ protected CheckpointingStatistics getTestResponseInstance() throws Exception {
44L,
10,
10,
CheckpointType.CHECKPOINT,
Collections.emptyMap(),
null,
false);
Expand All @@ -107,6 +109,7 @@ protected CheckpointingStatistics getTestResponseInstance() throws Exception {
4244L,
9,
9,
CheckpointType.SAVEPOINT,
checkpointStatisticsPerTask,
"externalPath",
false);
Expand All @@ -124,6 +127,7 @@ protected CheckpointingStatistics getTestResponseInstance() throws Exception {
22L,
11,
9,
CheckpointType.CHECKPOINT,
Collections.emptyMap(),
100L,
"Test failure");
Expand All @@ -147,6 +151,7 @@ protected CheckpointingStatistics getTestResponseInstance() throws Exception {
16L,
10,
10,
CheckpointType.CHECKPOINT,
Collections.emptyMap()
);

Expand Down

0 comments on commit b2a342c

Please sign in to comment.