Skip to content

Commit

Permalink
[FLINK-21484][rest] Do not expose internal CheckpointType enum via th…
Browse files Browse the repository at this point in the history
…e REST API

Instead use an intermediate class that can provide a backward compatibility and
detach the CheckpointType from the REST API.
  • Loading branch information
pnowojski committed Feb 24, 2021
1 parent 7c286d7 commit fbdc2c0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class CheckpointStatistics implements ResponseBody {
private final int numAckSubtasks;

@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
private final CheckpointType checkpointType;
private final RestAPICheckpointType checkpointType;

@JsonProperty(FIELD_NAME_TASKS)
@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
Expand All @@ -152,7 +152,7 @@ private CheckpointStatistics(
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType checkpointType,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) RestAPICheckpointType checkpointType,
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
@JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
Expand Down Expand Up @@ -208,7 +208,7 @@ public int getNumAckSubtasks() {
return numAckSubtasks;
}

public CheckpointType getCheckpointType() {
public RestAPICheckpointType getCheckpointType() {
return checkpointType;
}

Expand Down Expand Up @@ -313,7 +313,8 @@ public static CheckpointStatistics generateCheckpointStatistics(
completedCheckpointStats.getPersistedData(),
completedCheckpointStats.getNumberOfSubtasks(),
completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
completedCheckpointStats.getProperties().getCheckpointType(),
RestAPICheckpointType.valueOf(
completedCheckpointStats.getProperties().getCheckpointType()),
checkpointStatisticsPerTask,
completedCheckpointStats.getExternalPath(),
completedCheckpointStats.isDiscarded());
Expand All @@ -334,7 +335,8 @@ public static CheckpointStatistics generateCheckpointStatistics(
failedCheckpointStats.getPersistedData(),
failedCheckpointStats.getNumberOfSubtasks(),
failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
failedCheckpointStats.getProperties().getCheckpointType(),
RestAPICheckpointType.valueOf(
failedCheckpointStats.getProperties().getCheckpointType()),
checkpointStatisticsPerTask,
failedCheckpointStats.getFailureTimestamp(),
failedCheckpointStats.getFailureMessage());
Expand All @@ -355,7 +357,8 @@ public static CheckpointStatistics generateCheckpointStatistics(
pendingCheckpointStats.getPersistedData(),
pendingCheckpointStats.getNumberOfSubtasks(),
pendingCheckpointStats.getNumberOfAcknowledgedSubtasks(),
pendingCheckpointStats.getProperties().getCheckpointType(),
RestAPICheckpointType.valueOf(
pendingCheckpointStats.getProperties().getCheckpointType()),
checkpointStatisticsPerTask);
} else {
throw new IllegalArgumentException(
Expand All @@ -365,6 +368,29 @@ public static CheckpointStatistics generateCheckpointStatistics(
}
}

/**
* Backward compatibility layer between internal {@link CheckpointType} and a field used in
* {@link CheckpointStatistics}.
*/
enum RestAPICheckpointType {
CHECKPOINT,
SAVEPOINT,
SYNC_SAVEPOINT;

public static RestAPICheckpointType valueOf(CheckpointType checkpointType) {
switch (checkpointType) {
case CHECKPOINT:
return CHECKPOINT;
case SAVEPOINT:
return SAVEPOINT;
case SYNC_SAVEPOINT:
return SYNC_SAVEPOINT;
default:
throw new UnsupportedOperationException(checkpointType.toString());
}
}
}

// ---------------------------------------------------------------------
// Static inner classes
// ---------------------------------------------------------------------
Expand Down Expand Up @@ -397,7 +423,7 @@ public CompletedCheckpointStatistics(
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType checkpointType,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) RestAPICheckpointType checkpointType,
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
@JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
Expand Down Expand Up @@ -481,7 +507,7 @@ public FailedCheckpointStatistics(
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType checkpointType,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) RestAPICheckpointType checkpointType,
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
@JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
Expand Down Expand Up @@ -555,7 +581,7 @@ public PendingCheckpointStatistics(
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType checkpointType,
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) RestAPICheckpointType checkpointType,
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
@JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics.RestAPICheckpointType;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -76,7 +77,7 @@ protected CheckpointingStatistics getTestResponseInstance() throws Exception {
44L,
10,
10,
CheckpointType.CHECKPOINT,
RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT),
Collections.emptyMap(),
null,
false);
Expand All @@ -95,7 +96,7 @@ protected CheckpointingStatistics getTestResponseInstance() throws Exception {
4244L,
9,
9,
CheckpointType.SAVEPOINT,
RestAPICheckpointType.valueOf(CheckpointType.SAVEPOINT),
checkpointStatisticsPerTask,
"externalPath",
false);
Expand All @@ -114,7 +115,7 @@ protected CheckpointingStatistics getTestResponseInstance() throws Exception {
22L,
11,
9,
CheckpointType.CHECKPOINT,
RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT),
Collections.emptyMap(),
100L,
"Test failure");
Expand All @@ -136,7 +137,7 @@ protected CheckpointingStatistics getTestResponseInstance() throws Exception {
16L,
10,
10,
CheckpointType.CHECKPOINT,
RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT),
Collections.emptyMap());

final CheckpointingStatistics.LatestCheckpoints latestCheckpoints =
Expand Down

0 comments on commit fbdc2c0

Please sign in to comment.