Skip to content

Commit

Permalink
[FLINK-16536][network][checkpointing] Implement InputChannel state re…
Browse files Browse the repository at this point in the history
…covery for unaligned checkpoint

During recovery process for unaligned checkpoint, the input channel state should also be recovered besides with existing operator states.

We considered three guarantees during the implementation:
1. Make input recovery happen after the output recovery for providing more floating buffers on output side firstly.
2. Make partition request happen after input recovery for avoiding new data overtaking the previous state data.
3. Introduce a dedicated single IO executor for unspilling the channel state one by one, to avoid potential random IO.

This closes apache#11687.
  • Loading branch information
zhijiangW committed May 12, 2020
1 parent a267019 commit d7525ba
Show file tree
Hide file tree
Showing 39 changed files with 1,499 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public IndexedInputGate getInputGate(int index) {

@Override
public IndexedInputGate[] getAllInputGates() {
throw new UnsupportedOperationException(ERROR_MSG);
return new IndexedInputGate[0];
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
}

protected boolean getNextRecord(T target) throws IOException, InterruptedException {
// The action of partition request was removed from InputGate#setup since FLINK-16536, and this is the only
// unified way for launching partition request for batch jobs. In order to avoid potential performance concern,
// we might consider migrating this action back to the setup based on some condition judgement future.
inputGate.requestPartitions();

if (isFinished) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.InstantiationUtil;

Expand All @@ -59,6 +60,8 @@ public class EventSerializer {

private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;

private static final int END_OF_CHANNEL_STATE_EVENT = 5;

private static final int CHECKPOINT_TYPE_CHECKPOINT = 0;

private static final int CHECKPOINT_TYPE_SAVEPOINT = 1;
Expand All @@ -80,6 +83,9 @@ else if (eventClass == CheckpointBarrier.class) {
else if (eventClass == EndOfSuperstepEvent.class) {
return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
}
else if (eventClass == EndOfChannelStateEvent.class) {
return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_CHANNEL_STATE_EVENT });
}
else if (eventClass == CancelCheckpointMarker.class) {
CancelCheckpointMarker marker = (CancelCheckpointMarker) event;

Expand Down Expand Up @@ -129,6 +135,8 @@ private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass) throws IO
return type == CHECKPOINT_BARRIER_EVENT;
} else if (eventClass.equals(EndOfSuperstepEvent.class)) {
return type == END_OF_SUPERSTEP_EVENT;
} else if (eventClass.equals(EndOfChannelStateEvent.class)) {
return type == END_OF_CHANNEL_STATE_EVENT;
} else if (eventClass.equals(CancelCheckpointMarker.class)) {
return type == CANCEL_CHECKPOINT_MARKER_EVENT;
} else {
Expand Down Expand Up @@ -162,6 +170,9 @@ else if (type == CHECKPOINT_BARRIER_EVENT) {
else if (type == END_OF_SUPERSTEP_EVENT) {
return EndOfSuperstepEvent.INSTANCE;
}
else if (type == END_OF_CHANNEL_STATE_EVENT) {
return EndOfChannelStateEvent.INSTANCE;
}
else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {
long id = buffer.getLong();
return new CancelCheckpointMarker(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
Expand Down Expand Up @@ -87,6 +88,42 @@ Buffer requestBuffer() {
}
}

Buffer requestBufferBlocking() throws IOException, InterruptedException {
synchronized (bufferQueue) {
Buffer buffer;
while ((buffer = bufferQueue.takeBuffer()) == null) {
if (inputChannel.isReleased()) {
throw new CancelTaskException("Input channel [" + inputChannel.channelInfo + "] has already been released.");
}
if (!isWaitingForFloatingBuffers) {
BufferPool bufferPool = inputChannel.inputGate.getBufferPool();
buffer = bufferPool.requestBuffer();
if (buffer == null && shouldContinueRequest(bufferPool)) {
continue;
}
}

if (buffer != null) {
return buffer;
}
bufferQueue.wait();
}
return buffer;
}
}

private boolean shouldContinueRequest(BufferPool bufferPool) {
if (bufferPool.addBufferListener(this)) {
isWaitingForFloatingBuffers = true;
numRequiredBuffers = 1;
return false;
} else if (bufferPool.isDestroyed()) {
throw new CancelTaskException("Local buffer pool has already been released.");
} else {
return true;
}
}

/**
* Requests exclusive buffers from the provider and returns the number of requested amount.
*/
Expand Down Expand Up @@ -156,12 +193,21 @@ public void recycle(MemorySegment segment) {
}
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
} finally {
bufferQueue.notifyAll();
}
}

inputChannel.notifyBufferAvailable(numAddedBuffers);
}

void releaseFloatingBuffers() {
synchronized (bufferQueue) {
numRequiredBuffers = 0;
bufferQueue.releaseFloatingBuffers();
}
}

/**
* Recycles all the exclusive and floating buffers from the given buffer queue.
*/
Expand All @@ -180,6 +226,7 @@ void releaseAllBuffers(ArrayDeque<Buffer> buffers) throws IOException {
}
synchronized (bufferQueue) {
bufferQueue.releaseAll(exclusiveRecyclingSegments);
bufferQueue.notifyAll();
}

if (exclusiveRecyclingSegments.size() > 0) {
Expand Down Expand Up @@ -220,6 +267,7 @@ public BufferListener.NotificationResult notifyBufferAvailable(Buffer buffer) {
}

bufferQueue.addFloatingBuffer(buffer);
bufferQueue.notifyAll();

if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
isWaitingForFloatingBuffers = false;
Expand Down Expand Up @@ -350,6 +398,13 @@ void releaseAll(List<MemorySegment> exclusiveSegments) {
}
}

void releaseFloatingBuffers() {
Buffer buffer;
while ((buffer = floatingBuffers.poll()) != null) {
buffer.recycleBuffer();
}
}

int getAvailableBufferSize() {
return floatingBuffers.size() + exclusiveBuffers.size();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.RuntimeEvent;

/**
* This event marks a {@link RecoveredInputChannel} as fully consumed.
*/
public class EndOfChannelStateEvent extends RuntimeEvent {

/** The singleton instance of this event. */
public static final EndOfChannelStateEvent INSTANCE = new EndOfChannelStateEvent();

// ------------------------------------------------------------------------

// not instantiable
private EndOfChannelStateEvent() {}

// ------------------------------------------------------------------------

@Override
public void read(DataInputView in) {
// Nothing to do here
}

@Override
public void write(DataOutputView out) {
// Nothing to do here
}

// ------------------------------------------------------------------------

@Override
public int hashCode() {
return 1965146670;
}

@Override
public boolean equals(Object obj) {
return obj != null && obj.getClass() == EndOfChannelStateEvent.class;
}

@Override
public String toString() {
return getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ public abstract class InputChannel {
// - Partition request backoff --------------------------------------------

/** The initial backoff (in ms). */
private final int initialBackoff;
protected final int initialBackoff;

/** The maximum backoff (in ms). */
private final int maxBackoff;
protected final int maxBackoff;

protected final Counter numBytesIn;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -133,7 +135,18 @@ protected static class InputWithData<INPUT, DATA> {
/**
* Setup gate, potentially heavy-weight, blocking operation comparing to just creation.
*/
public abstract void setup() throws IOException, InterruptedException;
public abstract void setup() throws IOException;

/**
* Reads the previous unaligned checkpoint states before requesting partition data.
*
* @param executor the dedicated executor for performing this action for all the internal channels.
* @param reader the dedicated reader for unspilling the respective channel state from snapshots.
* @return the future indicates whether the recovered states have already been drained or not.
*/
public abstract CompletableFuture<?> readRecoveredState(ExecutorService executor, ChannelStateReader reader) throws IOException;

public abstract void requestPartitions() throws IOException;

public abstract void registerBufferReceivedListener(BufferReceivedListener listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand Down Expand Up @@ -71,10 +72,10 @@ public LocalInputChannel(
ResultPartitionID partitionId,
ResultPartitionManager partitionManager,
TaskEventPublisher taskEventPublisher,
InputChannelMetrics metrics) {
Counter numBytesIn,
Counter numBuffersIn) {

this(inputGate, channelIndex, partitionId, partitionManager, taskEventPublisher,
0, 0, metrics);
this(inputGate, channelIndex, partitionId, partitionManager, taskEventPublisher, 0, 0, numBytesIn, numBuffersIn);
}

public LocalInputChannel(
Expand All @@ -85,9 +86,10 @@ public LocalInputChannel(
TaskEventPublisher taskEventPublisher,
int initialBackoff,
int maxBackoff,
InputChannelMetrics metrics) {
Counter numBytesIn,
Counter numBuffersIn) {

super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter());
super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, numBytesIn, numBuffersIn);

this.partitionManager = checkNotNull(partitionManager);
this.taskEventPublisher = checkNotNull(taskEventPublisher);
Expand Down Expand Up @@ -303,4 +305,13 @@ public boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) throws IO
// already processed
return true;
}

// ------------------------------------------------------------------------
// Getter
// ------------------------------------------------------------------------

@VisibleForTesting
ResultSubpartitionView getSubpartitionView() {
return subpartitionView;
}
}
Loading

0 comments on commit d7525ba

Please sign in to comment.