Skip to content

Commit

Permalink
[FLINK-20488][runtime][checkpoint] Show checkpoint type in the Web-UI…
Browse files Browse the repository at this point in the history
… (AC/UC) for each subtask
  • Loading branch information
curcur authored and pnowojski committed Jan 8, 2021
1 parent 51ec88b commit 0abbee9
Show file tree
Hide file tree
Showing 14 changed files with 81 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
<th nzSortKey="alignment.processed" nzShowSort><strong>Processed (persisted) Data</strong></th>
<th nzSortKey="alignment.duration" nzShowSort><strong>Alignment Duration</strong></th>
<th nzSortKey="start_delay" nzShowSort><strong>Start Delay</strong></th>
<th nzSortKey="unaligned_checkpoint" nzShowSort><strong>Unaligned Checkpoint</strong></th>
</tr>
</thead>
<tbody>
Expand All @@ -105,6 +106,7 @@
<td>{{ subTask['alignment']['processed'] | humanizeBytes }} ({{ subTask['alignment']['persisted'] | humanizeBytes }})</td>
<td>{{ subTask['alignment']['duration'] | humanizeDuration}}</td>
<td>{{ subTask['start_delay'] | humanizeDuration}}</td>
<td>{{ subTask['unaligned_checkpoint']}}</td>
</ng-container>
<ng-container *ngIf="subTask['status'] == 'pending_or_failed'">
<td colspan="7">n/a</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ public class CheckpointMetrics implements Serializable {

private final long checkpointStartDelayNanos;

/** Is the checkpoint completed as an unaligned checkpoint. */
private final boolean unalignedCheckpoint;

public CheckpointMetrics() {
this(-1L, -1L, -1L, -1L, -1L, -1L);
this(-1L, -1L, -1L, -1L, -1L, -1L, false);
}

public CheckpointMetrics(
Expand All @@ -53,7 +56,8 @@ public CheckpointMetrics(
long alignmentDurationNanos,
long syncDurationMillis,
long asyncDurationMillis,
long checkpointStartDelayNanos) {
long checkpointStartDelayNanos,
boolean unalignedCheckpoint) {

// these may be "-1", in case the values are unknown or not set
checkArgument(bytesProcessedDuringAlignment >= -1);
Expand All @@ -69,6 +73,7 @@ public CheckpointMetrics(
this.syncDurationMillis = syncDurationMillis;
this.asyncDurationMillis = asyncDurationMillis;
this.checkpointStartDelayNanos = checkpointStartDelayNanos;
this.unalignedCheckpoint = unalignedCheckpoint;
}

public long getBytesProcessedDuringAlignment() {
Expand All @@ -95,6 +100,10 @@ public long getCheckpointStartDelayNanos() {
return checkpointStartDelayNanos;
}

public boolean getUnalignedCheckpoint() {
return unalignedCheckpoint;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -111,7 +120,8 @@ public boolean equals(Object o) {
&& alignmentDurationNanos == that.alignmentDurationNanos
&& syncDurationMillis == that.syncDurationMillis
&& asyncDurationMillis == that.asyncDurationMillis
&& checkpointStartDelayNanos == that.checkpointStartDelayNanos;
&& checkpointStartDelayNanos == that.checkpointStartDelayNanos
&& unalignedCheckpoint == that.unalignedCheckpoint;
}

@Override
Expand All @@ -122,7 +132,8 @@ public int hashCode() {
alignmentDurationNanos,
syncDurationMillis,
asyncDurationMillis,
checkpointStartDelayNanos);
checkpointStartDelayNanos,
unalignedCheckpoint);
}

@Override
Expand All @@ -140,6 +151,8 @@ public String toString() {
+ asyncDurationMillis
+ ", checkpointStartDelayNanos="
+ checkpointStartDelayNanos
+ ", unalignedCheckpoint="
+ unalignedCheckpoint
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CheckpointMetricsBuilder {
private long syncDurationMillis = -1L;
private long asyncDurationMillis = -1L;
private long checkpointStartDelayNanos = -1L;
private boolean unalignedCheckpoint = false;

public CheckpointMetricsBuilder setBytesProcessedDuringAlignment(
long bytesProcessedDuringAlignment) {
Expand Down Expand Up @@ -116,13 +117,19 @@ public long getCheckpointStartDelayNanos() {
return checkpointStartDelayNanos;
}

public CheckpointMetricsBuilder setUnalignedCheckpoint(boolean unalignedCheckpoint) {
this.unalignedCheckpoint = unalignedCheckpoint;
return this;
}

public CheckpointMetrics build() {
return new CheckpointMetrics(
checkStateAndGet(bytesProcessedDuringAlignment),
bytesPersistedDuringAlignment,
checkStateAndGet(alignmentDurationNanos),
syncDurationMillis,
asyncDurationMillis,
checkpointStartDelayNanos);
checkpointStartDelayNanos,
unalignedCheckpoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ public TaskAcknowledgeResult acknowledgeTask(
metrics.getBytesProcessedDuringAlignment(),
metrics.getBytesPersistedDuringAlignment(),
alignmentDurationMillis,
checkpointStartDelayMillis);
checkpointStartDelayMillis,
metrics.getUnalignedCheckpoint());

statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class SubtaskStateStats implements Serializable {
/** Checkpoint start delay in milliseconds. */
private final long checkpointStartDelay;

/** Is the checkpoint completed as an unaligned checkpoint. */
private final boolean unalignedCheckpoint;

SubtaskStateStats(
int subtaskIndex,
long ackTimestamp,
Expand All @@ -67,7 +70,8 @@ public class SubtaskStateStats implements Serializable {
long processedData,
long persistedData,
long alignmentDuration,
long checkpointStartDelay) {
long checkpointStartDelay,
boolean unalignedCheckpoint) {

checkArgument(subtaskIndex >= 0, "Negative subtask index");
this.subtaskIndex = subtaskIndex;
Expand All @@ -80,6 +84,7 @@ public class SubtaskStateStats implements Serializable {
this.persistedData = persistedData;
this.alignmentDuration = alignmentDuration;
this.checkpointStartDelay = checkpointStartDelay;
this.unalignedCheckpoint = unalignedCheckpoint;
}

public int getSubtaskIndex() {
Expand Down Expand Up @@ -154,4 +159,8 @@ public long getAlignmentDuration() {
public long getCheckpointStartDelay() {
return checkpointStartDelay;
}

public boolean getUnalignedCheckpoint() {
return unalignedCheckpoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ private static List<SubtaskCheckpointStatistics> createSubtaskCheckpointStatisti
subtask.getProcessedData(),
subtask.getPersistedData(),
subtask.getAlignmentDuration()),
subtask.getCheckpointStartDelay()));
subtask.getCheckpointStartDelay(),
subtask.getUnalignedCheckpoint()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public static final class CompletedSubtaskCheckpointStatistics

public static final String FIELD_NAME_START_DELAY = "start_delay";

public static final String FIELD_NAME_UNALIGNED_CHECKPOINT = "unaligned_checkpoint";

@JsonProperty(FIELD_NAME_ACK_TIMESTAMP)
private final long ackTimestamp;

Expand All @@ -124,6 +126,9 @@ public static final class CompletedSubtaskCheckpointStatistics
@JsonProperty(FIELD_NAME_START_DELAY)
private final long startDelay;

@JsonProperty(FIELD_NAME_UNALIGNED_CHECKPOINT)
private boolean unalignedCheckpoint;

@JsonCreator
public CompletedSubtaskCheckpointStatistics(
@JsonProperty(FIELD_NAME_INDEX) int index,
Expand All @@ -132,14 +137,16 @@ public CompletedSubtaskCheckpointStatistics(
@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
@JsonProperty(FIELD_NAME_CHECKPOINT_DURATION) CheckpointDuration checkpointDuration,
@JsonProperty(FIELD_NAME_ALIGNMENT) CheckpointAlignment alignment,
@JsonProperty(FIELD_NAME_START_DELAY) long startDelay) {
@JsonProperty(FIELD_NAME_START_DELAY) long startDelay,
@JsonProperty(FIELD_NAME_UNALIGNED_CHECKPOINT) boolean unalignedCheckpoint) {
super(index, "completed");
this.ackTimestamp = ackTimestamp;
this.duration = duration;
this.stateSize = stateSize;
this.checkpointDuration = checkpointDuration;
this.alignment = alignment;
this.startDelay = startDelay;
this.unalignedCheckpoint = unalignedCheckpoint;
}

public long getAckTimestamp() {
Expand All @@ -166,6 +173,10 @@ public long getStartDelay() {
return startDelay;
}

public boolean getUnalignedCheckpoint() {
return unalignedCheckpoint;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -180,13 +191,20 @@ public boolean equals(Object o) {
&& stateSize == that.stateSize
&& Objects.equals(checkpointDuration, that.checkpointDuration)
&& Objects.equals(alignment, that.alignment)
&& startDelay == that.startDelay;
&& startDelay == that.startDelay
&& unalignedCheckpoint == that.unalignedCheckpoint;
}

@Override
public int hashCode() {
return Objects.hash(
ackTimestamp, duration, stateSize, checkpointDuration, alignment, startDelay);
ackTimestamp,
duration,
stateSize,
checkpointDuration,
alignment,
startDelay,
unalignedCheckpoint);
}

/** Duration of the checkpoint. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
processedData,
persistedData,
ignored,
ignored);
ignored,
false);

assertTrue(pending.reportSubtaskStats(jobVertex.getJobVertexId(), subtaskStats));

Expand Down Expand Up @@ -548,6 +549,6 @@ static CheckpointStatsTracker createTestTracker() {
}

private SubtaskStateStats createSubtaskStats(int index) {
return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0);
return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void testIsJavaSerializable() throws Exception {
123129837912L,
42L,
44L,
new SubtaskStateStats(123, 213123, 123123, 0, 0, 0, 0, 0, 0),
new SubtaskStateStats(123, 213123, 123123, 0, 0, 0, 0, 0, 0, false),
null);

CompletedCheckpointStats copy = CommonTestUtils.createCopySerializable(completed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ private SubtaskStateStats createSubtaskStats(int index) {
Integer.MAX_VALUE + (long) index,
Integer.MAX_VALUE + (long) index,
Integer.MAX_VALUE + (long) index,
Integer.MAX_VALUE + (long) index);
Integer.MAX_VALUE + (long) index,
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public void test(boolean serialize) throws Exception {
Integer.MAX_VALUE + 8L,
Integer.MAX_VALUE + 9L,
Integer.MAX_VALUE + 6L,
Integer.MAX_VALUE + 7L);
Integer.MAX_VALUE + 7L,
false);

stats = serialize ? CommonTestUtils.createCopySerializable(stats) : stats;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ private void test(boolean serialize) throws Exception {
rand.nextInt(128),
rand.nextInt(128),
rand.nextInt(128),
rand.nextInt(128));
rand.nextInt(128),
false);

stateSize += subtasks[i].getStateSize();
processedData += subtasks[i].getProcessedData();
Expand All @@ -94,7 +95,9 @@ private void test(boolean serialize) throws Exception {
assertEquals(persistedData, taskStats.getPersistedDataStats());
}

assertFalse(taskStats.reportSubtaskStats(new SubtaskStateStats(0, 0, 0, 0, 0, 0, 0, 0, 0)));
assertFalse(
taskStats.reportSubtaskStats(
new SubtaskStateStats(0, 0, 0, 0, 0, 0, 0, 0, 0, false)));

taskStats = serialize ? CommonTestUtils.createCopySerializable(taskStats) : taskStats;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ protected TaskCheckpointStatisticsWithSubtaskDetails getTestResponseInstance()
.CheckpointDuration(1L, 2L),
new SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics
.CheckpointAlignment(2L, 4L, 5L, 3L),
42L));
42L,
true));

return new TaskCheckpointStatisticsWithSubtaskDetails(
4L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,13 +583,15 @@ private boolean takeSnapshotSync(
}

LOG.debug(
"{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms",
"{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms, is unaligned checkpoint : {}",
taskName,
checkpointId,
checkpointMetrics.getAlignmentDurationNanosOrDefault() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
checkpointMetrics.getSyncDurationMillis(),
checkpointOptions.isUnalignedCheckpoint());

checkpointMetrics.setSyncDurationMillis((System.nanoTime() - started) / 1_000_000);
checkpointMetrics.setUnalignedCheckpoint(checkpointOptions.isUnalignedCheckpoint());
return true;
}

Expand Down

0 comments on commit 0abbee9

Please sign in to comment.