Skip to content

Commit

Permalink
[FLINK-16404][runtime] Avoid caching buffers for blocked input channe…
Browse files Browse the repository at this point in the history
…ls before barrier alignment

This commit is the first part of implementation to solve the dead lock problem when reducing the exclusive buffer of receiver side to 0.

Reducing the number of exclusive buffers of receiver side to 0 can bring several advantages (may at the cost of some performance regression). One is that memory can be saved from the reduced network buffer usage. Another important benefit is that the in-flight data can be reduced so we can speed up checkpoint in cases of back pressure. However, for the current implementation, reducing the exclusive buffer of receiver side can incur deadlock problem because all the floating buffers might be requested away by some blocked input channels and never recycled until barrier alignment.

To solve the problem, this commit mainly makes the following changes:
1. At sender side, after sending a checkpoint barrier when aligned exactly-once checkpoint mode is used, the outgoing channel will be blocked and no data will be sent out until the channel is unblocked.
2. At receiver side, no buffer will be stored in BufferStorage any more and after a checkpoint is completed or canceled, the receiver side will resume data consumption and unblock the upstream by sending a special event to the sender side.

Note that after this patch we still can't set the exclusive buffer of receiver side to 0 because there is still deadlock problem which will be totally solved in the following up patches.
  • Loading branch information
wsry authored and zhijiangW committed Apr 27, 2020
1 parent 9069ac4 commit 2e313f0
Show file tree
Hide file tree
Showing 139 changed files with 1,382 additions and 2,889 deletions.
5 changes: 0 additions & 5 deletions docs/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1319,11 +1319,6 @@ Metrics related to data exchange between task executors using netty network comm
<td>Timestamp when the last checkpoint was restored at the coordinator (in milliseconds).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastCheckpointAlignmentBuffered</td>
<td>The number of buffered bytes during alignment over all subtasks for the last checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>numberOfInProgressCheckpoints</td>
<td>The number of in progress checkpoints.</td>
Expand Down
5 changes: 0 additions & 5 deletions docs/monitoring/metrics.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -1317,11 +1317,6 @@ Metrics related to data exchange between task executors using netty network comm
<td>Timestamp when the last checkpoint was restored at the coordinator (in milliseconds).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastCheckpointAlignmentBuffered</td>
<td>The number of buffered bytes during alignment over all subtasks for the last checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>numberOfInProgressCheckpoints</td>
<td>The number of in progress checkpoints.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,22 +532,6 @@ public class TaskManagerOptions {
.withDescription("Time we wait for the timers in milliseconds to finish all pending timer threads" +
" when the stream task is cancelled.");

/**
* The maximum number of bytes that a checkpoint alignment may buffer.
* If the checkpoint alignment buffers more than the configured amount of
* data, the checkpoint is aborted (skipped).
*
* <p>The default value of {@code -1} indicates that there is no limit.
*/
@Documentation.ExcludeFromDocumentation("With flow control, there is no alignment spilling any more")
public static final ConfigOption<Long> TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT =
key("task.checkpoint.alignment.max-size")
.longType()
.defaultValue(-1L)
.withDescription("The maximum number of bytes that a checkpoint alignment may buffer. If the checkpoint" +
" alignment buffers more than the configured amount of data, the checkpoint is aborted (skipped)." +
" A value of -1 indicates that there is no limit.");

// ------------------------------------------------------------------------

/** Not intended to be instantiated. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,16 @@ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState s
OP operator,
int index,
long timestamp,
boolean isExactlyOnceMode,
boolean isUnalignedCheckpoint,
CheckpointStorageWorkerView checkpointStorage,
Path savepointPath) throws Exception {

CheckpointOptions options = new CheckpointOptions(
CheckpointType.SAVEPOINT,
AbstractFsCheckpointStorage.encodePathAsReference(savepointPath));
AbstractFsCheckpointStorage.encodePathAsReference(savepointPath),
isExactlyOnceMode,
isUnalignedCheckpoint);

operator.prepareSnapshotPreBarrier(CHECKPOINT_ID);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void endInput() throws Exception {
this,
getRuntimeContext().getIndexOfThisSubtask(),
timestamp,
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
getContainingTask().getCheckpointStorage(),
savepointPath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public void endInput() throws Exception {
this,
getRuntimeContext().getIndexOfThisSubtask(),
timestamp,
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
getContainingTask().getCheckpointStorage(),
savepointPath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public void endInput() throws Exception {
this,
getRuntimeContext().getIndexOfThisSubtask(),
timestamp,
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
getContainingTask().getCheckpointStorage(),
savepointPath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testSnapshotUtilsLifecycle() throws Exception {

Path path = new Path(folder.newFolder().getAbsolutePath());

SnapshotUtils.snapshot(operator, 0, 0L, storage, path);
SnapshotUtils.snapshot(operator, 0, 0L, true, false, storage, path);

Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void testJobManagerJMXMetricAccess() throws Exception {
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
false,
0),
null));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,6 @@ public abstract class AbstractCheckpointStats implements Serializable {
*/
public abstract long getStateSize();

/**
* Returns the total buffered bytes during alignment over all subtasks.
*
* <p>Can return <code>-1</code> if the runtime did not report this.
*
* @return Total buffered bytes during alignment over all subtasks.
*/
public abstract long getAlignmentBuffered();

/**
* Returns the latest acknowledged subtask stats or <code>null</code> if
* none was acknowledged yet.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ public class CheckpointCoordinator {

private final Clock clock;

private final boolean isExactlyOnceMode;

private final boolean isUnalignedCheckpoint;

/** Flag represents there is an in-flight trigger request. */
private boolean isTriggering = false;

Expand Down Expand Up @@ -285,6 +289,8 @@ public CheckpointCoordinator(
this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
this.failureManager = checkNotNull(failureManager);
this.clock = checkNotNull(clock);
this.isExactlyOnceMode = chkConfig.isExactlyOnce();
this.isUnalignedCheckpoint = chkConfig.isUnalignedCheckpoint();

this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
Expand Down Expand Up @@ -754,7 +760,9 @@ private void snapshotTaskState(

final CheckpointOptions checkpointOptions = new CheckpointOptions(
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference());
checkpointStorageLocation.getLocationReference(),
isExactlyOnceMode,
isUnalignedCheckpoint);

// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ public class CheckpointMetrics implements Serializable {

private static final long serialVersionUID = 1L;

/** The number of bytes that were buffered during the checkpoint alignment phase. */
private long bytesBufferedInAlignment;

/** The duration (in nanoseconds) that the stream alignment for the checkpoint took. */
private long alignmentDurationNanos;

Expand All @@ -59,21 +56,11 @@ public CheckpointMetrics(
checkArgument(bytesBufferedInAlignment >= -1);
checkArgument(alignmentDurationNanos >= -1);

this.bytesBufferedInAlignment = bytesBufferedInAlignment;
this.alignmentDurationNanos = alignmentDurationNanos;
this.syncDurationMillis = syncDurationMillis;
this.asyncDurationMillis = asyncDurationMillis;
}

public long getBytesBufferedInAlignment() {
return bytesBufferedInAlignment;
}

public CheckpointMetrics setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
this.bytesBufferedInAlignment = bytesBufferedInAlignment;
return this;
}

public long getAlignmentDurationNanos() {
return alignmentDurationNanos;
}
Expand Down Expand Up @@ -121,8 +108,7 @@ public boolean equals(Object o) {

CheckpointMetrics that = (CheckpointMetrics) o;

return bytesBufferedInAlignment == that.bytesBufferedInAlignment &&
alignmentDurationNanos == that.alignmentDurationNanos &&
return alignmentDurationNanos == that.alignmentDurationNanos &&
syncDurationMillis == that.syncDurationMillis &&
asyncDurationMillis == that.asyncDurationMillis &&
checkpointStartDelayNanos == that.checkpointStartDelayNanos;
Expand All @@ -132,7 +118,6 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
bytesBufferedInAlignment,
alignmentDurationNanos,
syncDurationMillis,
asyncDurationMillis,
Expand All @@ -142,7 +127,6 @@ public int hashCode() {
@Override
public String toString() {
return "CheckpointMetrics{" +
"bytesBufferedInAlignment=" + bytesBufferedInAlignment +
", alignmentDurationNanos=" + alignmentDurationNanos +
", syncDurationMillis=" + syncDurationMillis +
", asyncDurationMillis=" + asyncDurationMillis +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;

Expand All @@ -42,12 +43,27 @@ public class CheckpointOptions implements Serializable {
/** Target location for the checkpoint. */
private final CheckpointStorageLocationReference targetLocation;

private final boolean isExactlyOnceMode;

private final boolean isUnalignedCheckpoint;

@VisibleForTesting
public CheckpointOptions(
CheckpointType checkpointType,
CheckpointStorageLocationReference targetLocation) {
this(checkpointType, targetLocation, true, false);
}

public CheckpointOptions(
CheckpointType checkpointType,
CheckpointStorageLocationReference targetLocation,
boolean isExactlyOnceMode,
boolean isUnalignedCheckpoint) {

this.checkpointType = checkNotNull(checkpointType);
this.targetLocation = checkNotNull(targetLocation);
this.isExactlyOnceMode = isExactlyOnceMode;
this.isUnalignedCheckpoint = isUnalignedCheckpoint;
}

// ------------------------------------------------------------------------
Expand All @@ -66,11 +82,24 @@ public CheckpointStorageLocationReference getTargetLocation() {
return targetLocation;
}

public boolean isExactlyOnceMode() {
return isExactlyOnceMode;
}

public boolean isUnalignedCheckpoint() {
return isUnalignedCheckpoint;
}

// ------------------------------------------------------------------------

@Override
public int hashCode() {
return 31 * targetLocation.hashCode() + checkpointType.hashCode();
int result = 1;
result = 31 * result + targetLocation.hashCode();
result = 31 * result + checkpointType.hashCode();
result = 31 * result + (isExactlyOnceMode ? 1 : 0);
result = 31 * result + (isUnalignedCheckpoint ? 1 : 0);
return result;
}

@Override
Expand All @@ -81,7 +110,9 @@ public boolean equals(Object obj) {
else if (obj != null && obj.getClass() == CheckpointOptions.class) {
final CheckpointOptions that = (CheckpointOptions) obj;
return this.checkpointType == that.checkpointType &&
this.targetLocation.equals(that.targetLocation);
this.targetLocation.equals(that.targetLocation) &&
this.isExactlyOnceMode == that.isExactlyOnceMode &&
this.isUnalignedCheckpoint == that.isUnalignedCheckpoint;
}
else {
return false;
Expand All @@ -90,7 +121,12 @@ else if (obj != null && obj.getClass() == CheckpointOptions.class) {

@Override
public String toString() {
return "CheckpointOptions: " + checkpointType + " @ " + targetLocation;
return "CheckpointOptions {" +
"checkpointType = " + checkpointType +
", targetLocation = " + targetLocation +
", isExactlyOnceMode = " + isExactlyOnceMode +
", isUnalignedCheckpoint = " + isUnalignedCheckpoint +
"}";
}

// ------------------------------------------------------------------------
Expand All @@ -100,7 +136,18 @@ public String toString() {
private static final CheckpointOptions CHECKPOINT_AT_DEFAULT_LOCATION =
new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());

@VisibleForTesting
public static CheckpointOptions forCheckpointWithDefaultLocation() {
return CHECKPOINT_AT_DEFAULT_LOCATION;
}

public static CheckpointOptions forCheckpointWithDefaultLocation(
boolean isExactlyOnceMode,
boolean isUnalignedCheckpoint) {
return new CheckpointOptions(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault(),
isExactlyOnceMode,
isUnalignedCheckpoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,6 @@ void reportFailedCheckpoint(FailedCheckpointStats failed) {
@VisibleForTesting
static final String LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC = "lastCheckpointDuration";

@VisibleForTesting
static final String LATEST_COMPLETED_CHECKPOINT_ALIGNMENT_BUFFERED_METRIC = "lastCheckpointAlignmentBuffered";

@VisibleForTesting
static final String LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC = "lastCheckpointExternalPath";

Expand All @@ -360,7 +357,6 @@ private void registerMetrics(MetricGroup metricGroup) {
metricGroup.gauge(LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC, new LatestRestoredCheckpointTimestampGauge());
metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC, new LatestCompletedCheckpointSizeGauge());
metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC, new LatestCompletedCheckpointDurationGauge());
metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_ALIGNMENT_BUFFERED_METRIC, new LatestCompletedCheckpointAlignmentBufferedGauge());
metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC, new LatestCompletedCheckpointExternalPathGauge());
}

Expand Down Expand Up @@ -428,18 +424,6 @@ public Long getValue() {
}
}

private class LatestCompletedCheckpointAlignmentBufferedGauge implements Gauge<Long> {
@Override
public Long getValue() {
CompletedCheckpointStats completed = latestCompletedCheckpoint;
if (completed != null) {
return completed.getAlignmentBuffered();
} else {
return -1L;
}
}
}

private class LatestCompletedCheckpointExternalPathGauge implements Gauge<String> {
@Override
public String getValue() {
Expand Down
Loading

0 comments on commit 2e313f0

Please sign in to comment.