Skip to content

Commit

Permalink
[FLINK-21990][streaming] Cancel task before clean up if execution was…
Browse files Browse the repository at this point in the history
… failed
  • Loading branch information
akalash authored and rkhachatryan committed Apr 16, 2021
1 parent 43883cc commit 345bf34
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,10 @@ public final void invoke() throws Exception {
} catch (Throwable invokeException) {
failing = !canceled;
try {
if (!canceled) {
cancelTask();
}

cleanUpInvoke();
}
// TODO: investigate why Throwable instead of Exception is used here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateHandle;
Expand All @@ -90,6 +92,8 @@
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand Down Expand Up @@ -1577,6 +1581,44 @@ public void testRestorePerformedFromInvoke() throws Exception {
assertThat(task.streamTask.restoreInvocationCount, is(1));
}

@Test
public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Exception {
// given: Configured SourceStreamTask with source which fails on checkpoint.
Configuration taskManagerConfig = new Configuration();
taskManagerConfig.setString(
StateBackendOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());

StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStateKeySerializer(mock(TypeSerializer.class));
cfg.setOperatorID(new OperatorID(4712L, 43L));

FailedSource failedSource = new FailedSource();
cfg.setStreamOperator(new TestStreamSource<String, FailedSource>(failedSource));
cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);

try (NettyShuffleEnvironment shuffleEnvironment =
new NettyShuffleEnvironmentBuilder().build()) {
Task task =
createTask(SourceStreamTask.class, shuffleEnvironment, cfg, taskManagerConfig);

// when: Task starts
task.startTaskThread();

// wait for the task starts doing the work.
failedSource.awaitRunning();

// and: Checkpoint is triggered which should lead to exception in Source.
task.triggerCheckpointBarrier(
42L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation());

// wait for clean termination.
task.getExecutingThread().join();

// then: The task doesn't hang but finished with FAILED state.
assertEquals(ExecutionState.FAILED, task.getExecutionState());
}
}

private MockEnvironment setupEnvironment(boolean... outputAvailabilities) {
final Configuration configuration = new Configuration();
new MockStreamConfig(configuration, outputAvailabilities.length);
Expand Down Expand Up @@ -2419,4 +2461,51 @@ public void notifyCheckpointComplete(long checkpointId) {
throw new ExpectedTestException();
}
}

private static class FailedSource extends RichParallelSourceFunction<String>
implements CheckpointedFunction {
private static CountDownLatch runningLatch = null;

private volatile boolean running;

public FailedSource() {
runningLatch = new CountDownLatch(1);
}

@Override
public void open(Configuration parameters) throws Exception {
running = true;
}

@Override
public void run(SourceContext<String> ctx) throws Exception {
runningLatch.countDown();
while (running) {
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ignore) {
// ignore
}
}
}

@Override
public void cancel() {
running = false;
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (runningLatch.getCount() == 0) {
throw new RuntimeException("source failed");
}
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {}

public void awaitRunning() throws InterruptedException {
runningLatch.await();
}
}
}

0 comments on commit 345bf34

Please sign in to comment.