Skip to content

Commit

Permalink
[FLINK-14264][rest] Expose state backend name
Browse files Browse the repository at this point in the history
  • Loading branch information
klion26 authored and zentol committed Dec 8, 2019
1 parent bd521c1 commit 3005f04
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 9 deletions.
3 changes: 3 additions & 0 deletions docs/_includes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,9 @@
"type" : "boolean"
}
}
},
"state_backend" : {
"type" : "string"
}
}
} </code>
Expand Down
3 changes: 3 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,9 @@
"type" : "boolean"
}
}
},
"state_backend" : {
"type" : "string"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public class CheckpointCoordinator {

private final Clock clock;

private final String stateBackName;
// --------------------------------------------------------------------------------------------

public CheckpointCoordinator(
Expand Down Expand Up @@ -268,6 +269,7 @@ public CheckpointCoordinator(
this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
this.failureManager = checkNotNull(failureManager);
this.clock = checkNotNull(clock);
this.stateBackName = checkpointStateBackend.getClass().getSimpleName();

this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
Expand Down Expand Up @@ -1223,6 +1225,10 @@ public boolean isPeriodicCheckpointingConfigured() {
return baseInterval != Long.MAX_VALUE;
}

public String getStateBackendName() {
return stateBackName;
}

// --------------------------------------------------------------------------------------------
// Periodic scheduling of checkpoints
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.annotation.Nullable;

import java.util.Map;
import java.util.Optional;

/**
* Common interface for the runtime {@link ExecutionGraph} and {@link ArchivedExecutionGraph}.
Expand Down Expand Up @@ -164,4 +165,11 @@ public interface AccessExecutionGraph {
* @return true, if the execution graph was archived, false otherwise
*/
boolean isArchived();

/**
* Returns the state backend name for this ExecutionGraph.
*
* @return The state backend name, or an empty Optional in the case of batch jobs
*/
Optional<String> getStateBackendName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;

/**
* An archived execution graph represents a serializable form of the {@link ExecutionGraph}.
Expand Down Expand Up @@ -95,6 +96,9 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
@Nullable
private final CheckpointStatsSnapshot checkpointStatsSnapshot;

@Nullable
private final String stateBackendName;

public ArchivedExecutionGraph(
JobID jobID,
String jobName,
Expand All @@ -109,7 +113,8 @@ public ArchivedExecutionGraph(
ArchivedExecutionConfig executionConfig,
boolean isStoppable,
@Nullable CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
@Nullable CheckpointStatsSnapshot checkpointStatsSnapshot) {
@Nullable CheckpointStatsSnapshot checkpointStatsSnapshot,
@Nullable String stateBackendName) {

this.jobID = Preconditions.checkNotNull(jobID);
this.jobName = Preconditions.checkNotNull(jobName);
Expand All @@ -125,6 +130,7 @@ public ArchivedExecutionGraph(
this.isStoppable = isStoppable;
this.jobCheckpointingConfiguration = jobCheckpointingConfiguration;
this.checkpointStatsSnapshot = checkpointStatsSnapshot;
this.stateBackendName = stateBackendName;
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -250,6 +256,11 @@ public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorsSeri
return serializedUserAccumulators;
}

@Override
public Optional<String> getStateBackendName() {
return Optional.ofNullable(stateBackendName);
}

class AllVerticesIterator implements Iterator<ArchivedExecutionVertex> {

private final Iterator<ArchivedExecutionJobVertex> jobVertices;
Expand Down Expand Up @@ -337,6 +348,9 @@ public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph) {
executionGraph.getArchivedExecutionConfig(),
executionGraph.isStoppable(),
executionGraph.getCheckpointCoordinatorConfiguration(),
executionGraph.getCheckpointStatsSnapshot());
executionGraph.getCheckpointStatsSnapshot(),
executionGraph.getCheckpointCoordinator() != null
? executionGraph.getCheckpointCoordinator().getStateBackendName()
: null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -431,6 +432,11 @@ public boolean isArchived() {
return false;
}

@Override
public Optional<String> getStateBackendName() {
return Optional.ofNullable(checkpointCoordinator).map(CheckpointCoordinator::getStateBackendName);
}

public void enableCheckpointing(
CheckpointCoordinatorConfiguration chkConfig,
List<ExecutionJobVertex> verticesToTrigger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,16 @@ private static CheckpointConfigInfo createCheckpointConfigInfo(AccessExecutionGr
retentionPolicy != CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
retentionPolicy != CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);

String stateBackendName = executionGraph.getStateBackendName().orElse(null);

return new CheckpointConfigInfo(
checkpointCoordinatorConfiguration.isExactlyOnce() ? CheckpointConfigInfo.ProcessingMode.EXACTLY_ONCE : CheckpointConfigInfo.ProcessingMode.AT_LEAST_ONCE,
checkpointCoordinatorConfiguration.getCheckpointInterval(),
checkpointCoordinatorConfiguration.getCheckpointTimeout(),
checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(),
checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(),
externalizedCheckpointInfo);
externalizedCheckpointInfo,
stateBackendName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class CheckpointConfigInfo implements ResponseBody {

public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization";

public static final String FIELD_NAME_STATE_BACKEND = "state_backend";

@JsonProperty(FIELD_NAME_PROCESSING_MODE)
private final ProcessingMode processingMode;

Expand All @@ -71,20 +73,25 @@ public class CheckpointConfigInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG)
private final ExternalizedCheckpointInfo externalizedCheckpointInfo;

@JsonProperty(FIELD_NAME_STATE_BACKEND)
private final String stateBackend;

@JsonCreator
public CheckpointConfigInfo(
@JsonProperty(FIELD_NAME_PROCESSING_MODE) ProcessingMode processingMode,
@JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long checkpointInterval,
@JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long checkpointTimeout,
@JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long minPauseBetweenCheckpoints,
@JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int maxConcurrentCheckpoints,
@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) ExternalizedCheckpointInfo externalizedCheckpointInfo) {
@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) ExternalizedCheckpointInfo externalizedCheckpointInfo,
@JsonProperty(FIELD_NAME_STATE_BACKEND) String stateBackend) {
this.processingMode = Preconditions.checkNotNull(processingMode);
this.checkpointInterval = checkpointInterval;
this.checkpointTimeout = checkpointTimeout;
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
this.externalizedCheckpointInfo = Preconditions.checkNotNull(externalizedCheckpointInfo);
this.stateBackend = Preconditions.checkNotNull(stateBackend);
}

@Override
Expand All @@ -101,12 +108,14 @@ public boolean equals(Object o) {
minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
processingMode == that.processingMode &&
Objects.equals(externalizedCheckpointInfo, that.externalizedCheckpointInfo);
Objects.equals(externalizedCheckpointInfo, that.externalizedCheckpointInfo) &&
Objects.equals(stateBackend, that.stateBackend);
}

@Override
public int hashCode() {
return Objects.hash(processingMode, checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointInfo);
return Objects.hash(processingMode, checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints,
maxConcurrentCheckpoints, externalizedCheckpointInfo, stateBackend);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ public SuspendableAccessExecutionGraph(JobID jobId) {
new ArchivedExecutionConfig(new ExecutionConfig()),
false,
null,
null);
null,
"stateBackendName");

jobStatus = super.getState();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public ArchivedExecutionGraph build() {
archivedExecutionConfig != null ? archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(),
isStoppable,
null,
null
null,
"stateBackendName"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ protected CheckpointConfigInfo getTestResponseInstance() {
2L,
3L,
4,
externalizedCheckpointInfo);
externalizedCheckpointInfo,
"stateBackendName");

}
}

0 comments on commit 3005f04

Please sign in to comment.