From 345bf34b858d3c9014559dfab5a4e0dea5b0832d Mon Sep 17 00:00:00 2001 From: Anton Kalashnikov Date: Tue, 6 Apr 2021 17:13:31 +0200 Subject: [PATCH] [FLINK-21990][streaming] Cancel task before clean up if execution was failed --- .../streaming/runtime/tasks/StreamTask.java | 4 + .../runtime/tasks/StreamTaskTest.java | 89 +++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index c412da9ab26a3..9ee0ad3a1f1eb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -630,6 +630,10 @@ public final void invoke() throws Exception { } catch (Throwable invokeException) { failing = !canceled; try { + if (!canceled) { + cancelTask(); + } + cleanUpInvoke(); } // TODO: investigate why Throwable instead of Exception is used here. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 729ebf87eb46c..9f0f594762862 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -66,6 +66,8 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -90,6 +92,8 @@ import org.apache.flink.runtime.taskmanager.TestTaskBuilder; import org.apache.flink.runtime.util.FatalExitExceptionHandler; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -1577,6 +1581,44 @@ public void testRestorePerformedFromInvoke() throws Exception { assertThat(task.streamTask.restoreInvocationCount, is(1)); } + @Test + public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Exception { + // given: Configured SourceStreamTask with source which fails on checkpoint. + Configuration taskManagerConfig = new Configuration(); + taskManagerConfig.setString( + StateBackendOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName()); + + StreamConfig cfg = new StreamConfig(new Configuration()); + cfg.setStateKeySerializer(mock(TypeSerializer.class)); + cfg.setOperatorID(new OperatorID(4712L, 43L)); + + FailedSource failedSource = new FailedSource(); + cfg.setStreamOperator(new TestStreamSource(failedSource)); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + try (NettyShuffleEnvironment shuffleEnvironment = + new NettyShuffleEnvironmentBuilder().build()) { + Task task = + createTask(SourceStreamTask.class, shuffleEnvironment, cfg, taskManagerConfig); + + // when: Task starts + task.startTaskThread(); + + // wait for the task starts doing the work. + failedSource.awaitRunning(); + + // and: Checkpoint is triggered which should lead to exception in Source. + task.triggerCheckpointBarrier( + 42L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()); + + // wait for clean termination. + task.getExecutingThread().join(); + + // then: The task doesn't hang but finished with FAILED state. + assertEquals(ExecutionState.FAILED, task.getExecutionState()); + } + } + private MockEnvironment setupEnvironment(boolean... outputAvailabilities) { final Configuration configuration = new Configuration(); new MockStreamConfig(configuration, outputAvailabilities.length); @@ -2419,4 +2461,51 @@ public void notifyCheckpointComplete(long checkpointId) { throw new ExpectedTestException(); } } + + private static class FailedSource extends RichParallelSourceFunction + implements CheckpointedFunction { + private static CountDownLatch runningLatch = null; + + private volatile boolean running; + + public FailedSource() { + runningLatch = new CountDownLatch(1); + } + + @Override + public void open(Configuration parameters) throws Exception { + running = true; + } + + @Override + public void run(SourceContext ctx) throws Exception { + runningLatch.countDown(); + while (running) { + try { + Thread.sleep(Integer.MAX_VALUE); + } catch (InterruptedException ignore) { + // ignore + } + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (runningLatch.getCount() == 0) { + throw new RuntimeException("source failed"); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception {} + + public void awaitRunning() throws InterruptedException { + runningLatch.await(); + } + } }