Skip to content

Commit

Permalink
[FLINK-24303][coordination] Failure when creating a source enumerator…
Browse files Browse the repository at this point in the history
… lead to full failover, not JobManager failure.

Instead of letting exceptions during the creation of the Source Enumerator bubble up (and utimately fail
the JobManager / Scheduler creation), we now catch those exceptions and trigger a full (global) failover
for that case.

This closes apache#17324
  • Loading branch information
StephanEwen committed Sep 21, 2021
1 parent ff01488 commit d18d1f4
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,26 @@

import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.FlinkRuntimeException;

import org.junit.Test;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -60,6 +69,34 @@ public void testMultipleSources() throws Exception {
executeAndVerify(env, stream1.union(stream2), 40);
}

@Test
public void testEnumeratorCreationFails() throws Exception {
OnceFailingToCreateEnumeratorSource.reset();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
final Source<Integer, ?, ?> source =
new OnceFailingToCreateEnumeratorSource(2, 10, Boundedness.BOUNDED);
final DataStream<Integer> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "TestingSource");
executeAndVerify(env, stream, 20);
}

@Test
public void testEnumeratorRestoreFails() throws Exception {
OnceFailingToRestoreEnumeratorSource.reset();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
env.enableCheckpointing(10);

final Source<Integer, ?, ?> source =
new OnceFailingToRestoreEnumeratorSource(2, 10, Boundedness.BOUNDED);
final DataStream<Integer> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "TestingSource");
executeAndVerify(env, stream, 20);
}

@SuppressWarnings("serial")
private void executeAndVerify(
StreamExecutionEnvironment env, DataStream<Integer> stream, int numRecords)
Expand All @@ -83,4 +120,131 @@ public void invoke(Integer value, Context context) throws Exception {
assertEquals(0, (int) result.get(0));
assertEquals(numRecords - 1, (int) result.get(result.size() - 1));
}

// ------------------------------------------------------------------------

private static class OnceFailingToCreateEnumeratorSource extends MockBaseSource {

private static final long serialVersionUID = 1L;
private static boolean hasFailed;

OnceFailingToCreateEnumeratorSource(
int numSplits, int numRecordsPerSplit, Boundedness boundedness) {
super(numSplits, numRecordsPerSplit, boundedness);
}

@Override
public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator(
SplitEnumeratorContext<MockSourceSplit> enumContext) {
if (!hasFailed) {
hasFailed = true;
throw new FlinkRuntimeException("Test Failure");
}

return super.createEnumerator(enumContext);
}

static void reset() {
hasFailed = false;
}
}

/**
* A source with the following behavior:
*
* <ol>
* <li>It initially creates an enumerator that does not assign work, waits until the first
* checkpoint completes (which contains all work, because none is assigned, yet) and then
* triggers a global failure.
* <li>Upon restoring from the failure, the first attempt to restore the enumerator fails with
* an exception.
* <li>The next time to restore the enumerator succeeds and the enumerator works regularly.
* </ol>
*/
private static class OnceFailingToRestoreEnumeratorSource extends MockBaseSource {

private static final long serialVersionUID = 1L;
private static boolean hasFailed;

OnceFailingToRestoreEnumeratorSource(
int numSplits, int numRecordsPerSplit, Boundedness boundedness) {
super(numSplits, numRecordsPerSplit, boundedness);
}

@Override
public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator(
SplitEnumeratorContext<MockSourceSplit> enumContext) {

final SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> enumerator =
super.createEnumerator(enumContext);

if (hasFailed) {
// after the failure happened, we proceed normally
return enumerator;
} else {
// before the failure, we go with
try {
final List<MockSourceSplit> splits = enumerator.snapshotState(1L);
return new NonAssigningEnumerator(splits, enumContext);
} catch (Exception e) {
throw new FlinkRuntimeException(e.getMessage(), e);
}
}
}

@Override
public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> restoreEnumerator(
SplitEnumeratorContext<MockSourceSplit> enumContext,
List<MockSourceSplit> checkpoint)
throws IOException {
if (!hasFailed) {
hasFailed = true;
throw new FlinkRuntimeException("Test Failure");
}

return super.restoreEnumerator(enumContext, checkpoint);
}

static void reset() {
hasFailed = false;
}

/**
* This enumerator does not assign work, so all state is in the checkpoint. After the first
* checkpoint is complete, it triggers a global failure.
*/
private static class NonAssigningEnumerator extends MockSplitEnumerator {

private final SplitEnumeratorContext<?> context;

NonAssigningEnumerator(
List<MockSourceSplit> splits, SplitEnumeratorContext<MockSourceSplit> context) {
super(splits, context);
this.context = context;
}

@Override
public void addReader(int subtaskId) {
// we do nothing here to make sure there is no progress
}

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
// we do nothing here to make sure there is no progress
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// This is a bit of a clumsy way to trigger a global failover from a coordinator.
// This is safe, though, because per the contract, exceptions in the enumerator
// handlers trigger a global failover.
context.callAsync(
() -> null,
(success, failure) -> {
throw new FlinkRuntimeException(
"Artificial trigger for Global Failover");
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public SourceCoordinator(
public void start() throws Exception {
LOG.info("Starting split enumerator for source {}.", operatorName);

// we mark this as started first, so that we can later distinguish the cases where
// 'start()' wasn't called and where 'start()' failed.
started = true;

// there are two ways the coordinator can get created:
// (1) Source.restoreEnumerator(), in which case the 'resetToCheckpoint()' method creates
// it
Expand All @@ -122,13 +126,17 @@ public void start() throws Exception {
try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(userCodeClassLoader)) {
enumerator = source.createEnumerator(context);
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
LOG.error("Failed to create Source Enumerator for source {}", operatorName, t);
context.failJob(t);
return;
}
}

// The start sequence is the first task in the coordinator executor.
// We rely on the single-threaded coordinator executor to guarantee
// the other methods are invoked after the enumerator has started.
started = true;
runInEventLoop(() -> enumerator.start(), "starting the SplitEnumerator.");
}

Expand Down Expand Up @@ -309,6 +317,14 @@ private void runInEventLoop(
final Object... actionNameFormatParameters) {

ensureStarted();

// we may end up here even for a non-started enumerator, in case the instantiation
// failed, and we get the 'subtaskFailed()' notification during the failover.
// we need to ignore those.
if (enumerator == null) {
return;
}

coordinatorExecutor.execute(
() -> {
try {
Expand Down Expand Up @@ -410,7 +426,5 @@ private void ensureStarted() {
if (!started) {
throw new IllegalStateException("The coordinator has not started yet.");
}

assert enumerator != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,26 @@ public void start() {
}
}

@Test
public void testFailJobWhenExceptionThrownFromEnumeratorCreation() throws Exception {
final RuntimeException failureReason = new RuntimeException("Artificial Exception");

final SourceCoordinator<?, ?> coordinator =
new SourceCoordinator<>(
OPERATOR_NAME,
coordinatorExecutor,
new EnumeratorCreatingSource<>(
() -> {
throw failureReason;
}),
context);

coordinator.start();

assertTrue(operatorCoordinatorContext.isJobFailed());
assertEquals(failureReason, operatorCoordinatorContext.getJobFailureReason());
}

@Test
public void testErrorThrownFromSplitEnumerator() throws Exception {
final Error error = new Error("Test Error");
Expand Down

0 comments on commit d18d1f4

Please sign in to comment.