Skip to content

Commit

Permalink
[FLINK-17719][task][checkpointing] Provide ChannelStateReader#hasStat…
Browse files Browse the repository at this point in the history
…es for hints of reading channel states

Currently we rely on whether unaligned checkpoint is enabled to determine whether to read recovered states during task startup,
then it will block the requirements of recovery from previous unaligned states even though the current mode is aligned. We can
make `ChannelStateReader` provide the hint whether there are any channel states to be read during startup, then we will never
lose any chances to recover from them.
  • Loading branch information
zhijiangW authored and pnowojski committed May 16, 2020
1 parent 909a13d commit 2b42209
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public interface ChannelStateReader extends AutoCloseable {
*/
enum ReadResult { HAS_MORE_DATA, NO_MORE_DATA }

/**
* Return whether there are any channel states to be read.
*/
boolean hasChannelStates();

/**
* Put data into the supplied buffer to be injected into
* {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel}.
Expand All @@ -50,6 +55,11 @@ enum ReadResult { HAS_MORE_DATA, NO_MORE_DATA }

ChannelStateReader NO_OP = new ChannelStateReader() {

@Override
public boolean hasChannelStates() {
return false;
}

@Override
public ReadResult readInputData(InputChannelInfo info, Buffer buffer) {
return ReadResult.NO_MORE_DATA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ private <T> void addReaders(
}
}

@Override
public boolean hasChannelStates() {
return !(inputChannelHandleReaders.isEmpty() && resultSubpartitionHandleReaders.isEmpty());
}

@Override
public ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException {
Preconditions.checkState(!isClosed, "reader is closed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public TaskStateManagerImpl(
);
}

TaskStateManagerImpl(
public TaskStateManagerImpl(
@Nonnull JobID jobId,
@Nonnull ExecutionAttemptID executionAttemptID,
@Nonnull TaskLocalStateStore localStateStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ public FiniteChannelStateReader(int totalStates, int[] states) {
this.states = states;
}

@Override
public boolean hasChannelStates() {
return true;
}

@Override
public ReadResult readInputData(InputChannelInfo info, Buffer buffer) {
for (int state: states) {
Expand Down Expand Up @@ -606,6 +611,11 @@ public void close() {
*/
public static final class ChannelStateReaderWithException implements ChannelStateReader {

@Override
public boolean hasChannelStates() {
return true;
}

@Override
public ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException {
throw new IOException("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.CancelTaskException;
Expand Down Expand Up @@ -474,15 +475,16 @@ protected void beforeInvoke() throws Exception {
}

private void readRecoveredChannelState() throws IOException, InterruptedException {
//TODO we will support channel state recovery even if the current setting is not unaligned checkpoint.
if (!configuration.isUnalignedCheckpointsEnabled()) {
ChannelStateReader reader = getEnvironment().getTaskStateManager().getChannelStateReader();
if (!reader.hasChannelStates()) {
requestPartitions();
return;
}

ResultPartitionWriter[] writers = getEnvironment().getAllWriters();
if (writers != null) {
for (ResultPartitionWriter writer : writers) {
writer.readRecoveredState(getEnvironment().getTaskStateManager().getChannelStateReader());
writer.readRecoveredState(reader);
}
}

Expand All @@ -492,8 +494,7 @@ private void readRecoveredChannelState() throws IOException, InterruptedExceptio
if (inputGates != null && inputGates.length > 0) {
CompletableFuture[] futures = new CompletableFuture[inputGates.length];
for (int i = 0; i < inputGates.length; i++) {
futures[i] = inputGates[i].readRecoveredState(
channelIOExecutor, getEnvironment().getTaskStateManager().getChannelStateReader());
futures[i] = inputGates[i].readRecoveredState(channelIOExecutor, reader);
}

// Note that we must request partition after all the single gates finished recovery.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
Expand Down Expand Up @@ -74,16 +75,17 @@
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskLocalStateStore;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpCheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand Down Expand Up @@ -955,7 +957,7 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
}

@Test
public void testInitializeChannelState() throws Exception {
public void testBeforeInvokeWithoutChannelStates() throws Exception {
int numWriters = 2;
int numGates = 2;
RecoveryResultPartition[] partitions = new RecoveryResultPartition[numWriters];
Expand All @@ -967,18 +969,52 @@ public void testInitializeChannelState() throws Exception {
gates[i] = new RecoveryInputGate(partitions);
}

Configuration configuration = new Configuration();
configuration.setBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true);
MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskConfiguration(configuration).build();
MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
mockEnvironment.addOutputs(Arrays.asList(partitions));
mockEnvironment.addInputs(Arrays.asList(gates));
StreamTask task = new MockStreamTaskBuilder(mockEnvironment).build();
try {
verifyResults(gates, partitions, false);
verifyResults(gates, partitions, false, false);

task.beforeInvoke();

verifyResults(gates, partitions, true);
verifyResults(gates, partitions, false, true);
} finally {
task.cleanUpInvoke();
}
}

@Test
public void testBeforeInvokeWithChannelStates() throws Exception {
int numWriters = 2;
int numGates = 2;
RecoveryResultPartition[] partitions = new RecoveryResultPartition[numWriters];
for (int i = 0; i < numWriters; i++) {
partitions[i] = new RecoveryResultPartition();
}
RecoveryInputGate[] gates = new RecoveryInputGate[numGates];
for (int i = 0; i < numGates; i++) {
gates[i] = new RecoveryInputGate(partitions);
}

ChannelStateReader reader = new ResultPartitionTest.FiniteChannelStateReader(1, new int[] {0});
TaskStateManager taskStateManager = new TaskStateManagerImpl(
new JobID(),
new ExecutionAttemptID(),
new TestTaskLocalStateStore(),
null,
NoOpCheckpointResponder.INSTANCE,
reader);
MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskStateManager(taskStateManager).build();
mockEnvironment.addOutputs(Arrays.asList(partitions));
mockEnvironment.addInputs(Arrays.asList(gates));
StreamTask task = new MockStreamTaskBuilder(mockEnvironment).build();
try {
verifyResults(gates, partitions, false, false);

task.beforeInvoke();

verifyResults(gates, partitions, true, false);

// execute the partition request mail inserted after input recovery completes
task.mailboxProcessor.drain();
Expand All @@ -991,13 +1027,13 @@ public void testInitializeChannelState() throws Exception {
}
}

private void verifyResults(RecoveryInputGate[] gates, RecoveryResultPartition[] partitions, boolean expected) {
private void verifyResults(RecoveryInputGate[] gates, RecoveryResultPartition[] partitions, boolean recoveryExpected, boolean requestExpected) {
for (RecoveryResultPartition resultPartition : partitions) {
assertEquals(expected, resultPartition.isStateRecovered());
assertEquals(recoveryExpected, resultPartition.isStateRecovered());
}
for (RecoveryInputGate inputGate : gates) {
assertEquals(expected, inputGate.isStateRecovered());
assertFalse(inputGate.isPartitionRequested());
assertEquals(recoveryExpected, inputGate.isStateRecovered());
assertEquals(requestExpected, inputGate.isPartitionRequested());
}
}

Expand Down

0 comments on commit 2b42209

Please sign in to comment.